Documentation for Controllers

Controllers in ViMMS implement fragmentation strategies, determining which ions in an MS1 (survey) scan should be fragmented. A standard fragmentation strategy used in data-dependant acquisition (DDA) is the Top-N strategy, where the top N most intense ions in the survey scan are fragmented. This strategy is implemented in the TopNController in ViMMS.

In addition to Top-N, several other DDA strategies have been implemented that improve upon the standard TopN controller (for more details, refer to our papers).

ViMMS also includes implementations of several common data-independent acquisition (DIA) strategies such as All-ion-fragmentation (AIF), SWATH-MS (Sequential Windowed Acquisition of All Theoretical Fragment Ion Mass Spectra).

The following are broad categories of controllers that are available in ViMMS:

AIF

Bases: Controller

A controller that implements the All-ion-fragmentation (AIF) DIA fragmentation strategy. Should be used in conjunction with MS-DIAL for deconvolution.

Source code in vimms/Controller/dia.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class AIF(Controller):
    """
    A controller that implements the All-ion-fragmentation (AIF) DIA fragmentation strategy.
    Should be used in conjunction with MS-DIAL for deconvolution.
    """

    def __init__(self, ms1_source_cid_energy, advanced_params=None):
        """
        Initialise an AIF controller
        Args:
            ms1_source_cid_energy: source CID energy for MS1 scan
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
        """
        super().__init__(advanced_params=advanced_params)
        self.scan_number = self.initial_scan_id
        self.ms1_source_cid_energy = ms1_source_cid_energy

    def write_msdial_experiment_file(self, filename):
        """
        Generates a file that can be read by MS-DIAL to perform deconvolution

        Args:
            filename: path to experiment file in MS-DIAL format

        Returns: None

        """
        heads = ["ID", "MS Type", "Start m/z", "End m/z", "Name", "CE", "DecTarget(1:Yes, 0:No)"]
        start = self.advanced_params.default_ms1_scan_window[0]
        stop = self.advanced_params.default_ms1_scan_window[1]
        ce = self.ms1_source_cid_energy
        ms1_row = ["0", "SCAN", start, stop, "0eV", 0, 0]
        aif_row = ["1", "ALL", start, stop, "{}eV".format(ce), ce, 1]

        out_dir = os.path.dirname(filename)
        create_if_not_exist(out_dir)

        with open(filename, "w", newline="") as f:
            writer = csv.writer(f, delimiter="\t", dialect="excel")
            writer.writerow(heads)
            writer.writerow(ms1_row)
            writer.writerow(aif_row)

    def update_state_after_scan(self, last_scan):
        pass

    def _process_scan(self, scan):
        """
        This method is called when a scan arrives that requires action.
        Normally means that we should schedule some more, but in DIA we don't need
        to actually look at the peaks in the scan, so just schedule the next block

        Args:
            scan: A new [vimms.MassSpec.Scan][] to process.

        Returns: newly generated scans

        """

        # For all ions fragmentation, when we receive the last scan of
        # the previous block, we make a new block. Each block is an MS1 scan
        # followed by an MS2 scan where the MS2 fragmens everything
        scans = []

        if self.scan_to_process is not None:
            # make the MS1 scan with source cid energy applied
            aif_scan = self.get_ms1_scan_params()
            aif_scan.set(ScanParameters.SOURCE_CID_ENERGY, self.ms1_source_cid_energy)
            self._check_scan(aif_scan)

            scans.append(aif_scan)
            self.scan_number += 1  # increase every time we make a scan

            # make the MS1 scan with no energy applied
            ms1_scan = self.get_ms1_scan_params()
            self._check_scan(ms1_scan)

            scans.append(ms1_scan)
            self.scan_number += 1
            self.next_processed_scan_id = self.scan_number

            # set this ms1 scan as has been processed
            self.scan_to_process = None

        return scans

__init__(ms1_source_cid_energy, advanced_params=None)

Initialise an AIF controller Args: ms1_source_cid_energy: source CID energy for MS1 scan advanced_params: an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

Source code in vimms/Controller/dia.py
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(self, ms1_source_cid_energy, advanced_params=None):
    """
    Initialise an AIF controller
    Args:
        ms1_source_cid_energy: source CID energy for MS1 scan
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
    """
    super().__init__(advanced_params=advanced_params)
    self.scan_number = self.initial_scan_id
    self.ms1_source_cid_energy = ms1_source_cid_energy

write_msdial_experiment_file(filename)

Generates a file that can be read by MS-DIAL to perform deconvolution

Parameters:
  • filename

    path to experiment file in MS-DIAL format

Returns: None

Source code in vimms/Controller/dia.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def write_msdial_experiment_file(self, filename):
    """
    Generates a file that can be read by MS-DIAL to perform deconvolution

    Args:
        filename: path to experiment file in MS-DIAL format

    Returns: None

    """
    heads = ["ID", "MS Type", "Start m/z", "End m/z", "Name", "CE", "DecTarget(1:Yes, 0:No)"]
    start = self.advanced_params.default_ms1_scan_window[0]
    stop = self.advanced_params.default_ms1_scan_window[1]
    ce = self.ms1_source_cid_energy
    ms1_row = ["0", "SCAN", start, stop, "0eV", 0, 0]
    aif_row = ["1", "ALL", start, stop, "{}eV".format(ce), ce, 1]

    out_dir = os.path.dirname(filename)
    create_if_not_exist(out_dir)

    with open(filename, "w", newline="") as f:
        writer = csv.writer(f, delimiter="\t", dialect="excel")
        writer.writerow(heads)
        writer.writerow(ms1_row)
        writer.writerow(aif_row)

AdvancedParams

An object that stores advanced parameters used to control the mass spec e.g. AGC target, collision energy, orbitrap resolution etc.

When ViMMS is connected to an actual mass spec instrument (Orbitrap Fusion in our case) via IAPI, most of these values are directly passed to the mass spec as they are. Generally you can leave these settings to their default.

In simulated use, most of these values are not used and therefore they won't affect simulated results.

Source code in vimms/Controller/base.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class AdvancedParams:
    """
    An object that stores advanced parameters used to control the mass spec
    e.g. AGC target, collision energy, orbitrap resolution etc.

    When ViMMS is connected to an actual mass spec instrument (Orbitrap Fusion in our case)
    via IAPI, most of these values are directly passed to the mass spec as they are.
    Generally you can leave these settings to their default.

    In simulated use, most of these values are not used and therefore they won't affect simulated
    results.
    """

    def __init__(
        self,
        default_ms1_scan_window=DEFAULT_MS1_SCAN_WINDOW,
        ms1_agc_target=DEFAULT_MS1_AGC_TARGET,
        ms1_max_it=DEFAULT_MS1_MAXIT,
        ms1_collision_energy=DEFAULT_MS1_COLLISION_ENERGY,
        ms1_orbitrap_resolution=DEFAULT_MS1_ORBITRAP_RESOLUTION,
        ms1_activation_type=DEFAULT_MS1_ACTIVATION_TYPE,
        ms1_mass_analyser=DEFAULT_MS1_MASS_ANALYSER,
        ms1_isolation_mode=DEFAULT_MS1_ISOLATION_MODE,
        ms1_source_cid_energy=DEFAULT_SOURCE_CID_ENERGY,
        ms2_agc_target=DEFAULT_MS2_AGC_TARGET,
        ms2_max_it=DEFAULT_MS2_MAXIT,
        ms2_collision_energy=DEFAULT_MS2_COLLISION_ENERGY,
        ms2_orbitrap_resolution=DEFAULT_MS2_ORBITRAP_RESOLUTION,
        ms2_activation_type=DEFAULT_MS2_ACTIVATION_TYPE,
        ms2_mass_analyser=DEFAULT_MS2_MASS_ANALYSER,
        ms2_isolation_mode=DEFAULT_MS2_ISOLATION_MODE,
        ms2_source_cid_energy=DEFAULT_SOURCE_CID_ENERGY,
    ):
        """
        Create an advanced parameter object

        Args:
            default_ms1_scan_window: the m/z window to perform MS1 scan
            ms1_agc_target: automatic gain control target for MS1 scan
            ms1_max_it: maximum time to acquire ions for MS1 scan
            ms1_collision_energy: the collision energy used for MS1 scan
            ms1_orbitrap_resolution: the Orbitrap resolution used for MS1 scan
            ms1_activation_type: the activation type for MS1 scan, either CID or HCD
            ms1_mass_analyser: the mass analyser to use for MS1 scan, either IonTrap or Orbitrap
            ms1_isolation_mode: the isolation mode for MS1 scan, either None, Quadrupole, IonTrap
            ms1_source_cid_energy: source CID energy
            ms2_agc_target: automatic gain control target for MS2 scan
            ms2_max_it: maximum time to acquire ions for MS2 scan
            ms2_collision_energy: the collision energy used for MS2 scan
            ms2_orbitrap_resolution: the Orbitrap resolution used for MS2 scan
            ms2_activation_type: the activation type for MS2 scan, either CID or HCD
            ms2_mass_analyser: the mass analyser to use for MS2 scan, either IonTrap or Orbitrap
            ms2_isolation_mode: the isolation mode for MS2 scan, either None, Quadrupole, IonTrap
            ms2_source_cid_energy: source CID energy
        """
        self.default_ms1_scan_window = default_ms1_scan_window

        self.ms1_agc_target = ms1_agc_target
        self.ms1_max_it = ms1_max_it
        self.ms1_collision_energy = ms1_collision_energy
        self.ms1_orbitrap_resolution = ms1_orbitrap_resolution
        self.ms1_activation_type = ms1_activation_type
        self.ms1_mass_analyser = ms1_mass_analyser
        self.ms1_isolation_mode = ms1_isolation_mode
        self.ms1_source_cid_energy = ms1_source_cid_energy

        self.ms2_agc_target = ms2_agc_target
        self.ms2_max_it = ms2_max_it
        self.ms2_collision_energy = ms2_collision_energy
        self.ms2_orbitrap_resolution = ms2_orbitrap_resolution
        self.ms2_activation_type = ms2_activation_type
        self.ms2_mass_analyser = ms2_mass_analyser
        self.ms2_isolation_mode = ms2_isolation_mode
        self.ms2_source_cid_energy = ms2_source_cid_energy

__init__(default_ms1_scan_window=DEFAULT_MS1_SCAN_WINDOW, ms1_agc_target=DEFAULT_MS1_AGC_TARGET, ms1_max_it=DEFAULT_MS1_MAXIT, ms1_collision_energy=DEFAULT_MS1_COLLISION_ENERGY, ms1_orbitrap_resolution=DEFAULT_MS1_ORBITRAP_RESOLUTION, ms1_activation_type=DEFAULT_MS1_ACTIVATION_TYPE, ms1_mass_analyser=DEFAULT_MS1_MASS_ANALYSER, ms1_isolation_mode=DEFAULT_MS1_ISOLATION_MODE, ms1_source_cid_energy=DEFAULT_SOURCE_CID_ENERGY, ms2_agc_target=DEFAULT_MS2_AGC_TARGET, ms2_max_it=DEFAULT_MS2_MAXIT, ms2_collision_energy=DEFAULT_MS2_COLLISION_ENERGY, ms2_orbitrap_resolution=DEFAULT_MS2_ORBITRAP_RESOLUTION, ms2_activation_type=DEFAULT_MS2_ACTIVATION_TYPE, ms2_mass_analyser=DEFAULT_MS2_MASS_ANALYSER, ms2_isolation_mode=DEFAULT_MS2_ISOLATION_MODE, ms2_source_cid_energy=DEFAULT_SOURCE_CID_ENERGY)

Create an advanced parameter object

Parameters:
  • default_ms1_scan_window

    the m/z window to perform MS1 scan

  • ms1_agc_target

    automatic gain control target for MS1 scan

  • ms1_max_it

    maximum time to acquire ions for MS1 scan

  • ms1_collision_energy

    the collision energy used for MS1 scan

  • ms1_orbitrap_resolution

    the Orbitrap resolution used for MS1 scan

  • ms1_activation_type

    the activation type for MS1 scan, either CID or HCD

  • ms1_mass_analyser

    the mass analyser to use for MS1 scan, either IonTrap or Orbitrap

  • ms1_isolation_mode

    the isolation mode for MS1 scan, either None, Quadrupole, IonTrap

  • ms1_source_cid_energy

    source CID energy

  • ms2_agc_target

    automatic gain control target for MS2 scan

  • ms2_max_it

    maximum time to acquire ions for MS2 scan

  • ms2_collision_energy

    the collision energy used for MS2 scan

  • ms2_orbitrap_resolution

    the Orbitrap resolution used for MS2 scan

  • ms2_activation_type

    the activation type for MS2 scan, either CID or HCD

  • ms2_mass_analyser

    the mass analyser to use for MS2 scan, either IonTrap or Orbitrap

  • ms2_isolation_mode

    the isolation mode for MS2 scan, either None, Quadrupole, IonTrap

  • ms2_source_cid_energy

    source CID energy

Source code in vimms/Controller/base.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __init__(
    self,
    default_ms1_scan_window=DEFAULT_MS1_SCAN_WINDOW,
    ms1_agc_target=DEFAULT_MS1_AGC_TARGET,
    ms1_max_it=DEFAULT_MS1_MAXIT,
    ms1_collision_energy=DEFAULT_MS1_COLLISION_ENERGY,
    ms1_orbitrap_resolution=DEFAULT_MS1_ORBITRAP_RESOLUTION,
    ms1_activation_type=DEFAULT_MS1_ACTIVATION_TYPE,
    ms1_mass_analyser=DEFAULT_MS1_MASS_ANALYSER,
    ms1_isolation_mode=DEFAULT_MS1_ISOLATION_MODE,
    ms1_source_cid_energy=DEFAULT_SOURCE_CID_ENERGY,
    ms2_agc_target=DEFAULT_MS2_AGC_TARGET,
    ms2_max_it=DEFAULT_MS2_MAXIT,
    ms2_collision_energy=DEFAULT_MS2_COLLISION_ENERGY,
    ms2_orbitrap_resolution=DEFAULT_MS2_ORBITRAP_RESOLUTION,
    ms2_activation_type=DEFAULT_MS2_ACTIVATION_TYPE,
    ms2_mass_analyser=DEFAULT_MS2_MASS_ANALYSER,
    ms2_isolation_mode=DEFAULT_MS2_ISOLATION_MODE,
    ms2_source_cid_energy=DEFAULT_SOURCE_CID_ENERGY,
):
    """
    Create an advanced parameter object

    Args:
        default_ms1_scan_window: the m/z window to perform MS1 scan
        ms1_agc_target: automatic gain control target for MS1 scan
        ms1_max_it: maximum time to acquire ions for MS1 scan
        ms1_collision_energy: the collision energy used for MS1 scan
        ms1_orbitrap_resolution: the Orbitrap resolution used for MS1 scan
        ms1_activation_type: the activation type for MS1 scan, either CID or HCD
        ms1_mass_analyser: the mass analyser to use for MS1 scan, either IonTrap or Orbitrap
        ms1_isolation_mode: the isolation mode for MS1 scan, either None, Quadrupole, IonTrap
        ms1_source_cid_energy: source CID energy
        ms2_agc_target: automatic gain control target for MS2 scan
        ms2_max_it: maximum time to acquire ions for MS2 scan
        ms2_collision_energy: the collision energy used for MS2 scan
        ms2_orbitrap_resolution: the Orbitrap resolution used for MS2 scan
        ms2_activation_type: the activation type for MS2 scan, either CID or HCD
        ms2_mass_analyser: the mass analyser to use for MS2 scan, either IonTrap or Orbitrap
        ms2_isolation_mode: the isolation mode for MS2 scan, either None, Quadrupole, IonTrap
        ms2_source_cid_energy: source CID energy
    """
    self.default_ms1_scan_window = default_ms1_scan_window

    self.ms1_agc_target = ms1_agc_target
    self.ms1_max_it = ms1_max_it
    self.ms1_collision_energy = ms1_collision_energy
    self.ms1_orbitrap_resolution = ms1_orbitrap_resolution
    self.ms1_activation_type = ms1_activation_type
    self.ms1_mass_analyser = ms1_mass_analyser
    self.ms1_isolation_mode = ms1_isolation_mode
    self.ms1_source_cid_energy = ms1_source_cid_energy

    self.ms2_agc_target = ms2_agc_target
    self.ms2_max_it = ms2_max_it
    self.ms2_collision_energy = ms2_collision_energy
    self.ms2_orbitrap_resolution = ms2_orbitrap_resolution
    self.ms2_activation_type = ms2_activation_type
    self.ms2_mass_analyser = ms2_mass_analyser
    self.ms2_isolation_mode = ms2_isolation_mode
    self.ms2_source_cid_energy = ms2_source_cid_energy

AgentBasedController

Bases: Controller

A class that implements an agent-based controller.

Source code in vimms/Controller/abc.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class AgentBasedController(Controller):
    """
    A class that implements an agent-based controller.
    """

    def __init__(self, agent, advanced_params=None):
        """Initialises an agent-based controller.

        Arguments:
            agent: an instance of the [vimms.Agent.AbstractAgent][] class.
            advanced_params: optional advanced parameters for the mass spec.
        """
        super().__init__(advanced_params=advanced_params)
        self.agent = agent

    def _process_scan(self, scan):
        new_tasks = []
        if self.scan_to_process is not None:
            new_tasks, self.current_task_id, self.next_processed_scan_id = self.agent.next_tasks(
                self.scan_to_process, self, self.current_task_id
            )
            self.scan_to_process = None  # has been processed
        return new_tasks

    def update_state_after_scan(self, last_scan):
        self.agent.update(last_scan, self)

__init__(agent, advanced_params=None)

Initialises an agent-based controller.

Parameters:
  • agent

    an instance of the vimms.Agent.AbstractAgent class.

  • advanced_params

    optional advanced parameters for the mass spec.

Source code in vimms/Controller/abc.py
16
17
18
19
20
21
22
23
24
def __init__(self, agent, advanced_params=None):
    """Initialises an agent-based controller.

    Arguments:
        agent: an instance of the [vimms.Agent.AbstractAgent][] class.
        advanced_params: optional advanced parameters for the mass spec.
    """
    super().__init__(advanced_params=advanced_params)
    self.agent = agent

CaseControlNonOverlapController

Bases: TopNEXtController

Case-control non-overlap controller

Source code in vimms/Controller/box.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
class CaseControlNonOverlapController(TopNEXtController):
    """
    Case-control non-overlap controller
    """

    def __init__(
        self,
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        grid,
        smartroi_params=None,
        min_roi_length_for_fragmentation=1,
        ms1_shift=0,
        advanced_params=None,
        register_all_roi=False,
        scoring_params=GRID_CONTROLLER_SCORING_PARAMS,
        exclusion_method=ROI_EXCLUSION_DEW,
        exclusion_t_0=None,
    ):
        super().__init__(
            ionisation_mode,
            isolation_width,
            N,
            mz_tol,
            rt_tol,
            min_ms1_intensity,
            roi_params,
            grid,
            smartroi_params=smartroi_params,
            min_roi_length_for_fragmentation=min_roi_length_for_fragmentation,
            ms1_shift=ms1_shift,
            advanced_params=advanced_params,
            register_all_roi=register_all_roi,
            scoring_params=scoring_params,
            exclusion_method=exclusion_method,
            exclusion_t_0=exclusion_t_0,
        )
        self.scoring_params = scoring_params
        if self.scoring_params["theta3"] != 0 and self.register_all_roi is False:
            print("Warning: register_all_roi should be set to True id theta3 is not 0")

    def _get_scores(self):
        scores = [
            self.grid.case_control_non_overlap(
                r, self.current_roi_intensities[i], self.scoring_params
            )
            for i, r in enumerate(self.live_roi)
        ]
        return self._get_top_N_scores(scores * self._score_filters())

Controller

Bases: ABC

Abtract base class for controllers.

Source code in vimms/Controller/base.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
class Controller(ABC):
    """
    Abtract base class for controllers.
    """

    def __init__(self, advanced_params=None):
        """
        Initialise a base Controller class.

        Args:
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
        """
        if advanced_params is None:
            self.advanced_params = AdvancedParams()
        else:
            self.advanced_params = advanced_params

        self.scans = defaultdict(list)  # key: ms level, value: list of scans for that level
        self.scan_to_process = None
        self.environment = None
        self.next_processed_scan_id = INITIAL_SCAN_ID
        self.initial_scan_id = INITIAL_SCAN_ID
        self.current_task_id = self.initial_scan_id
        self.processing_times = []
        self.last_ms1_rt = 0.0

    def __repr__(self):
        return f"{type(self)}({','.join(f'{k}={v}' for k, v in self.__dict__.items())})"

    def get_ms1_scan_params(self, metadata=None):
        """
        Generate a default scan parameter for MS1 scan. The generated scan parameter object
        is typically passed to the mass spec (whether real or simulated) that produces
        the actual MS1 scan.
        Args:
            metadata: any additional metadata to include

        Returns: a [vimms.Common.ScanParameters][] object that describes the MS1 scan to generate.

        """
        task = get_default_scan_params(
            polarity=self.environment.mass_spec.ionisation_mode,
            default_ms1_scan_window=self.advanced_params.default_ms1_scan_window,
            agc_target=self.advanced_params.ms1_agc_target,
            max_it=self.advanced_params.ms1_max_it,
            collision_energy=self.advanced_params.ms1_collision_energy,
            source_cid_energy=self.advanced_params.ms1_source_cid_energy,
            orbitrap_resolution=self.advanced_params.ms1_orbitrap_resolution,
            activation_type=self.advanced_params.ms1_activation_type,
            mass_analyser=self.advanced_params.ms1_mass_analyser,
            isolation_mode=self.advanced_params.ms1_isolation_mode,
            metadata=metadata,
        )
        return task

    def get_ms2_scan_params(
        self, mz, intensity, precursor_scan_id, isolation_width, mz_tol, rt_tol, metadata=None
    ):
        """
        Generate a default scan parameter for MS2 scan. The generated scan parameter object
        is typically passed to the mass spec (whether real or simulated) that produces
        the actual MS2 scan.

        Args:
            mz: the m/z of the precursor ion to fragment
            intensity: the intensity of the precursor ion to fragment
            precursor_scan_id: the associated MS1 scan ID that contains this precursor ion
            isolation_width: isolation width, in Dalton
            mz_tol: m/z tolerance for dynamic exclusion (TODO: this shouldn't be here)
            rt_tol: RT tolerance for dynamic exclusion (TODO: this shouldn't be here)
            metadata: any additional metadata to include

        Returns: a [vimms.Common.ScanParameters][] object that describes the MS2 scan to generate.

        """
        task = get_dda_scan_param(
            mz,
            intensity,
            precursor_scan_id,
            isolation_width,
            mz_tol,
            rt_tol,
            agc_target=self.advanced_params.ms2_agc_target,
            max_it=self.advanced_params.ms2_max_it,
            collision_energy=self.advanced_params.ms2_collision_energy,
            source_cid_energy=self.advanced_params.ms2_source_cid_energy,
            orbitrap_resolution=self.advanced_params.ms2_orbitrap_resolution,
            activation_type=self.advanced_params.ms2_activation_type,
            mass_analyser=self.advanced_params.ms2_mass_analyser,
            isolation_mode=self.advanced_params.ms2_isolation_mode,
            polarity=self.environment.mass_spec.ionisation_mode,
            metadata=metadata,
        )
        return task

    def get_initial_tasks(self):
        """
        Gets the initial tasks to load immediately into the mass spec
        (before acquisition starts)

        Returns: an empty list of tasks, unless overridden by subclass

        """
        return []

    def get_initial_scan_params(self):
        """
        Gets the initial scan parameters to send to the mass spec that
        starts the whole process. Will default to sending an MS1 scan with
        whatever parameters passed in self.params. Subclasses can override
        this to return different types of scans.

        Returns: a [vimms.Common.ScanParameters][] object describing the initial scan to make.

        """
        return self.get_ms1_scan_params()

    def set_environment(self, env):
        """
        Set the environment used to run this controller

        Args:
            env: an [vimms.Environment.Environment] object or its subclasses.

        Returns: None

        """
        self.environment = env

    def handle_scan(self, scan, current_size, pending_size):
        """
        Basic codes to handle an incoming scan, which is generally the same for all controllers.

        Args:
            scan: A new [vimms.MassSpec.Scan][] to process.
            current_size: current size of task buffer
            pending_size: pending size of task buffer

        Returns: a list of new [vimms.Common.ScanParameters][] describing what to do next.

        """
        # record every scan that we've received
        self.scans[scan.ms_level].append(scan)

        # update ms1 time (used for ROI matching)
        if scan.ms_level == 1:
            self.last_ms1_rt = scan.rt
            self.last_ms1_scan = scan

        # we get an ms1 scan and it has some peaks AND all the pending tasks
        # have been sent and processed AND this ms1 scan is a custom scan
        # we'd sent before (not a method scan) then store it for
        # fragmentation next time

        if scan.scan_id == self.next_processed_scan_id:
            self.scan_to_process = scan
        else:
            self.scan_to_process = None

        # implemented by subclass
        if self.scan_to_process is not None:
            # track how long each scan takes to process
            start = time.time()
            new_tasks = self._process_scan(scan)
            elapsed = time.time() - start
            self.processing_times.append(elapsed)
        else:
            # this scan is not the one we want to process, but here we
            # pass it to _process_scan anyway in case the subclass wants
            # to do something with it
            new_tasks = self._process_scan(scan)
        return new_tasks

    @abstractmethod
    def update_state_after_scan(self, last_scan):
        """Update internal state after a scan has been processed.

        Arguments:
            last_scan ([vimms.MassSpec.Scan][]): the last-processed object.
        """
        pass

    @abstractmethod
    def _process_scan(self, scan):
        """Process incoming scan

        Arguments:
            scan: A new [vimms.MassSpec.Scan][] to process.
        """
        pass

    def dump_scans(self, output_method):
        """
        Dump all scans to the output format.
        Useful for debugging.

        Args:
            output_method: a function that accepts scan information as a CSV string from pandas

        Returns: None

        """
        all_scans = self.scans[1] + self.scans[2]
        all_scans.sort(key=lambda x: x.scan_id)  # sort by scan_id
        out_list = []
        for scan in all_scans:
            # ignore any scan that we didn't send (no scan_params)
            if scan.scan_params is not None:
                out = {
                    "scan_id": scan.scan_id,
                    "num_peaks": scan.num_peaks,
                    "rt": scan.rt,
                    "ms_level": scan.ms_level,
                }
                # add all the scan params to out
                out.update(scan.scan_params.get_all())
                out_list.append(out)

        # dump to csv
        df = pd.DataFrame(out_list)
        output_method(df.to_csv(index=False, line_terminator="\n"))

    def _check_scan(self, params):
        """
        Checks that the conditions that are checked in
        vimms-fusion MS class pass here. This is done to ensure that the values are
        all in a consistent state.

        Args:
            params: a [vimms.Common.ScanParameters][] object to check.

        Returns:

        """

        collision_energy = params.get(ScanParameters.COLLISION_ENERGY)
        orbitrap_resolution = params.get(ScanParameters.ORBITRAP_RESOLUTION)
        activation_type = params.get(ScanParameters.ACTIVATION_TYPE)
        mass_analyser = params.get(ScanParameters.MASS_ANALYSER)
        isolation_mode = params.get(ScanParameters.ISOLATION_MODE)
        agc_target = params.get(ScanParameters.AGC_TARGET)
        max_it = params.get(ScanParameters.MAX_IT)
        # source_cid_energy = params.get(ScanParameters.SOURCE_CID_ENERGY)
        polarity = params.get(ScanParameters.POLARITY)
        first_mass = params.get(ScanParameters.FIRST_MASS)
        last_mass = params.get(ScanParameters.LAST_MASS)

        assert collision_energy is not None
        assert orbitrap_resolution is not None
        assert activation_type is not None
        assert mass_analyser is not None
        assert isolation_mode is not None
        assert agc_target is not None
        assert max_it is not None
        assert polarity is not None
        assert first_mass is not None
        assert last_mass is not None

    def after_injection_cleanup(self):
        """
        Clean-up method at the end of each injection.

        Returns: None

        """
        pass

__init__(advanced_params=None)

Initialise a base Controller class.

Parameters:
Source code in vimms/Controller/base.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def __init__(self, advanced_params=None):
    """
    Initialise a base Controller class.

    Args:
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
    """
    if advanced_params is None:
        self.advanced_params = AdvancedParams()
    else:
        self.advanced_params = advanced_params

    self.scans = defaultdict(list)  # key: ms level, value: list of scans for that level
    self.scan_to_process = None
    self.environment = None
    self.next_processed_scan_id = INITIAL_SCAN_ID
    self.initial_scan_id = INITIAL_SCAN_ID
    self.current_task_id = self.initial_scan_id
    self.processing_times = []
    self.last_ms1_rt = 0.0

after_injection_cleanup()

Clean-up method at the end of each injection.

Returns: None

Source code in vimms/Controller/base.py
372
373
374
375
376
377
378
379
def after_injection_cleanup(self):
    """
    Clean-up method at the end of each injection.

    Returns: None

    """
    pass

dump_scans(output_method)

Dump all scans to the output format. Useful for debugging.

Parameters:
  • output_method

    a function that accepts scan information as a CSV string from pandas

Returns: None

Source code in vimms/Controller/base.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def dump_scans(self, output_method):
    """
    Dump all scans to the output format.
    Useful for debugging.

    Args:
        output_method: a function that accepts scan information as a CSV string from pandas

    Returns: None

    """
    all_scans = self.scans[1] + self.scans[2]
    all_scans.sort(key=lambda x: x.scan_id)  # sort by scan_id
    out_list = []
    for scan in all_scans:
        # ignore any scan that we didn't send (no scan_params)
        if scan.scan_params is not None:
            out = {
                "scan_id": scan.scan_id,
                "num_peaks": scan.num_peaks,
                "rt": scan.rt,
                "ms_level": scan.ms_level,
            }
            # add all the scan params to out
            out.update(scan.scan_params.get_all())
            out_list.append(out)

    # dump to csv
    df = pd.DataFrame(out_list)
    output_method(df.to_csv(index=False, line_terminator="\n"))

get_initial_scan_params()

Gets the initial scan parameters to send to the mass spec that starts the whole process. Will default to sending an MS1 scan with whatever parameters passed in self.params. Subclasses can override this to return different types of scans.

Returns: a vimms.Common.ScanParameters object describing the initial scan to make.

Source code in vimms/Controller/base.py
219
220
221
222
223
224
225
226
227
228
229
def get_initial_scan_params(self):
    """
    Gets the initial scan parameters to send to the mass spec that
    starts the whole process. Will default to sending an MS1 scan with
    whatever parameters passed in self.params. Subclasses can override
    this to return different types of scans.

    Returns: a [vimms.Common.ScanParameters][] object describing the initial scan to make.

    """
    return self.get_ms1_scan_params()

get_initial_tasks()

Gets the initial tasks to load immediately into the mass spec (before acquisition starts)

Returns: an empty list of tasks, unless overridden by subclass

Source code in vimms/Controller/base.py
209
210
211
212
213
214
215
216
217
def get_initial_tasks(self):
    """
    Gets the initial tasks to load immediately into the mass spec
    (before acquisition starts)

    Returns: an empty list of tasks, unless overridden by subclass

    """
    return []

get_ms1_scan_params(metadata=None)

Generate a default scan parameter for MS1 scan. The generated scan parameter object is typically passed to the mass spec (whether real or simulated) that produces the actual MS1 scan. Args: metadata: any additional metadata to include

Returns: a vimms.Common.ScanParameters object that describes the MS1 scan to generate.

Source code in vimms/Controller/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def get_ms1_scan_params(self, metadata=None):
    """
    Generate a default scan parameter for MS1 scan. The generated scan parameter object
    is typically passed to the mass spec (whether real or simulated) that produces
    the actual MS1 scan.
    Args:
        metadata: any additional metadata to include

    Returns: a [vimms.Common.ScanParameters][] object that describes the MS1 scan to generate.

    """
    task = get_default_scan_params(
        polarity=self.environment.mass_spec.ionisation_mode,
        default_ms1_scan_window=self.advanced_params.default_ms1_scan_window,
        agc_target=self.advanced_params.ms1_agc_target,
        max_it=self.advanced_params.ms1_max_it,
        collision_energy=self.advanced_params.ms1_collision_energy,
        source_cid_energy=self.advanced_params.ms1_source_cid_energy,
        orbitrap_resolution=self.advanced_params.ms1_orbitrap_resolution,
        activation_type=self.advanced_params.ms1_activation_type,
        mass_analyser=self.advanced_params.ms1_mass_analyser,
        isolation_mode=self.advanced_params.ms1_isolation_mode,
        metadata=metadata,
    )
    return task

get_ms2_scan_params(mz, intensity, precursor_scan_id, isolation_width, mz_tol, rt_tol, metadata=None)

Generate a default scan parameter for MS2 scan. The generated scan parameter object is typically passed to the mass spec (whether real or simulated) that produces the actual MS2 scan.

Parameters:
  • mz

    the m/z of the precursor ion to fragment

  • intensity

    the intensity of the precursor ion to fragment

  • precursor_scan_id

    the associated MS1 scan ID that contains this precursor ion

  • isolation_width

    isolation width, in Dalton

  • mz_tol

    m/z tolerance for dynamic exclusion (TODO: this shouldn't be here)

  • rt_tol

    RT tolerance for dynamic exclusion (TODO: this shouldn't be here)

  • metadata

    any additional metadata to include

Returns: a vimms.Common.ScanParameters object that describes the MS2 scan to generate.

Source code in vimms/Controller/base.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def get_ms2_scan_params(
    self, mz, intensity, precursor_scan_id, isolation_width, mz_tol, rt_tol, metadata=None
):
    """
    Generate a default scan parameter for MS2 scan. The generated scan parameter object
    is typically passed to the mass spec (whether real or simulated) that produces
    the actual MS2 scan.

    Args:
        mz: the m/z of the precursor ion to fragment
        intensity: the intensity of the precursor ion to fragment
        precursor_scan_id: the associated MS1 scan ID that contains this precursor ion
        isolation_width: isolation width, in Dalton
        mz_tol: m/z tolerance for dynamic exclusion (TODO: this shouldn't be here)
        rt_tol: RT tolerance for dynamic exclusion (TODO: this shouldn't be here)
        metadata: any additional metadata to include

    Returns: a [vimms.Common.ScanParameters][] object that describes the MS2 scan to generate.

    """
    task = get_dda_scan_param(
        mz,
        intensity,
        precursor_scan_id,
        isolation_width,
        mz_tol,
        rt_tol,
        agc_target=self.advanced_params.ms2_agc_target,
        max_it=self.advanced_params.ms2_max_it,
        collision_energy=self.advanced_params.ms2_collision_energy,
        source_cid_energy=self.advanced_params.ms2_source_cid_energy,
        orbitrap_resolution=self.advanced_params.ms2_orbitrap_resolution,
        activation_type=self.advanced_params.ms2_activation_type,
        mass_analyser=self.advanced_params.ms2_mass_analyser,
        isolation_mode=self.advanced_params.ms2_isolation_mode,
        polarity=self.environment.mass_spec.ionisation_mode,
        metadata=metadata,
    )
    return task

handle_scan(scan, current_size, pending_size)

Basic codes to handle an incoming scan, which is generally the same for all controllers.

Parameters:
  • scan

    A new vimms.MassSpec.Scan to process.

  • current_size

    current size of task buffer

  • pending_size

    pending size of task buffer

Returns: a list of new vimms.Common.ScanParameters describing what to do next.

Source code in vimms/Controller/base.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def handle_scan(self, scan, current_size, pending_size):
    """
    Basic codes to handle an incoming scan, which is generally the same for all controllers.

    Args:
        scan: A new [vimms.MassSpec.Scan][] to process.
        current_size: current size of task buffer
        pending_size: pending size of task buffer

    Returns: a list of new [vimms.Common.ScanParameters][] describing what to do next.

    """
    # record every scan that we've received
    self.scans[scan.ms_level].append(scan)

    # update ms1 time (used for ROI matching)
    if scan.ms_level == 1:
        self.last_ms1_rt = scan.rt
        self.last_ms1_scan = scan

    # we get an ms1 scan and it has some peaks AND all the pending tasks
    # have been sent and processed AND this ms1 scan is a custom scan
    # we'd sent before (not a method scan) then store it for
    # fragmentation next time

    if scan.scan_id == self.next_processed_scan_id:
        self.scan_to_process = scan
    else:
        self.scan_to_process = None

    # implemented by subclass
    if self.scan_to_process is not None:
        # track how long each scan takes to process
        start = time.time()
        new_tasks = self._process_scan(scan)
        elapsed = time.time() - start
        self.processing_times.append(elapsed)
    else:
        # this scan is not the one we want to process, but here we
        # pass it to _process_scan anyway in case the subclass wants
        # to do something with it
        new_tasks = self._process_scan(scan)
    return new_tasks

set_environment(env)

Set the environment used to run this controller

Parameters:
  • env

    an [vimms.Environment.Environment] object or its subclasses.

Returns: None

Source code in vimms/Controller/base.py
231
232
233
234
235
236
237
238
239
240
241
def set_environment(self, env):
    """
    Set the environment used to run this controller

    Args:
        env: an [vimms.Environment.Environment] object or its subclasses.

    Returns: None

    """
    self.environment = env

update_state_after_scan(last_scan) abstractmethod

Update internal state after a scan has been processed.

Parameters:
  • last_scan ([vimms.MassSpec.Scan][]) –

    the last-processed object.

Source code in vimms/Controller/base.py
287
288
289
290
291
292
293
294
@abstractmethod
def update_state_after_scan(self, last_scan):
    """Update internal state after a scan has been processed.

    Arguments:
        last_scan ([vimms.MassSpec.Scan][]): the last-processed object.
    """
    pass

DEWFilter

Bases: ScoreFilter

A class that implements dynamic exclusion filter

Source code in vimms/Exclusion.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
class DEWFilter(ScoreFilter):
    """
    A class that implements dynamic exclusion filter
    """

    def __init__(self, rt_tol):
        """
        Initialises a dynamic exclusion filter based on time only
        Args:
            rt_tol: the RT tolerance (in seconds)
        """
        self.rt_tol = rt_tol

    def filter(self, current_rt, rois):
        """
        Check whether intensity values are above or below the threshold
        Args:
            current_rt: the current RT value
            rois: a list of [vimms.Roi.Roi][] objects.

        Returns: an array of indicators for the filter
        """

        last_frag_rts = [roi.last_frag_rt for roi in rois]

        # Handles None values by converting to NaN for which all
        # comparisons return 0
        return np.logical_not(current_rt - np.array(last_frag_rts, dtype=np.double) <= self.rt_tol)

__init__(rt_tol)

Initialises a dynamic exclusion filter based on time only Args: rt_tol: the RT tolerance (in seconds)

Source code in vimms/Exclusion.py
492
493
494
495
496
497
498
def __init__(self, rt_tol):
    """
    Initialises a dynamic exclusion filter based on time only
    Args:
        rt_tol: the RT tolerance (in seconds)
    """
    self.rt_tol = rt_tol

filter(current_rt, rois)

Check whether intensity values are above or below the threshold Args: current_rt: the current RT value rois: a list of vimms.Roi.Roi objects.

Returns: an array of indicators for the filter

Source code in vimms/Exclusion.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
def filter(self, current_rt, rois):
    """
    Check whether intensity values are above or below the threshold
    Args:
        current_rt: the current RT value
        rois: a list of [vimms.Roi.Roi][] objects.

    Returns: an array of indicators for the filter
    """

    last_frag_rts = [roi.last_frag_rt for roi in rois]

    # Handles None values by converting to NaN for which all
    # comparisons return 0
    return np.logical_not(current_rt - np.array(last_frag_rts, dtype=np.double) <= self.rt_tol)

DiaController

Bases: Controller

A class for doing tree and nested DIA methods. Also has a SWATH type controller, but reccommend to use SWATH class above. Method uses windows methods from DIA.py to create the pattern of windows needed to run the controllers. Note: the following method used multiple simultaneous isolation windows

Source code in vimms/Controller/dia.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class DiaController(Controller):
    """
    A class for doing tree and nested DIA methods.
    Also has a SWATH type controller, but reccommend to use SWATH class
    above. Method uses windows methods from DIA.py to create the pattern
    of windows needed to run the controllers.
    Note: the following method used multiple simultaneous isolation windows
    """

    def __init__(
        self,
        min_mz,
        max_mz,  # TODO: add scan overlap to DiaWindows
        window_type,
        kaufmann_design,
        num_windows,
        scan_overlap=0,
        extra_bins=0,
        dia_design="kaufmann",
        advanced_params=None,
    ):
        super().__init__(advanced_params=advanced_params)
        self.dia_design = dia_design
        self.window_type = window_type
        self.kaufmann_design = kaufmann_design
        self.extra_bins = extra_bins
        self.num_windows = num_windows
        self.scan_overlap = scan_overlap
        self.min_mz = min_mz  # scan from this mz
        self.max_mz = max_mz  # scan to this mz

        self.scan_number = self.initial_scan_id

    def update_state_after_scan(self, last_scan):
        pass

    def _process_scan(self, scan):
        # if there's a previous ms1 scan to process
        new_tasks = []
        if self.scan_to_process is not None:

            mz_tol = 10  # not used
            rt_tol = 15  # these are not used

            precursor_scan_id = self.scan_to_process.scan_id

            mzs = self.scan_to_process.mzs
            if len(mzs) > 0:  # check that ms1 scan is not empty
                default_range = [(self.min_mz, self.max_mz)]
                locations = DiaWindows(
                    mzs,
                    default_range,
                    self.dia_design,
                    self.window_type,
                    self.kaufmann_design,
                    self.extra_bins,
                    self.num_windows,
                ).locations
                for loc in locations:
                    mz = []
                    isolation_width = []
                    intensity = []
                    for sub_loc in loc[0]:
                        mz.append(sum(sub_loc) / 2)
                        isolation_width.append(sub_loc[1] - sub_loc[0])
                        intensity.append(0)
                    dda_scan_params = self.get_ms2_scan_params(
                        mz, intensity, precursor_scan_id, isolation_width, mz_tol, rt_tol
                    )

                    # push this dda scan to the mass spec queue
                    new_tasks.append(dda_scan_params)
            else:
                locations = []

            # make the MS1 scan
            task = self.get_ms1_scan_params()
            new_tasks.append(task)

            self.scan_number += len(locations) + 1
            self.next_processed_scan_id = self.scan_number
        return new_tasks

DiaWindows

Class for creating windows for basic, tree and nested DIA methods. Method is used in DiaController in Controller/dia. Basic methods are approximately equal to a SWATH method with no overlapping windows

Source code in vimms/DIA.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class DiaWindows:
    """
    Class for creating windows for basic, tree and nested DIA methods. Method is used in
    DiaController in Controller/dia. Basic methods are approximately equal to a SWATH method
    with no overlapping windows
    """

    # flake8: noqa: C901
    def __init__(
        self,
        ms1_mzs,
        ms1_range,
        dia_design,
        window_type,
        kaufmann_design,
        extra_bins,
        num_windows=None,
        range_slack=0.01,
    ):
        ms1_range_difference = ms1_range[0][1] - ms1_range[0][0]
        # set the number of windows for kaufmann method
        if dia_design == "kaufmann":
            num_windows = 64
        # dont allow extra bins for basic method
        if dia_design == "basic" and extra_bins > 0:
            raise ValueError("Cannot have extra bins with 'basic' dia design.")
        # find bin walls and extra bin walls
        if window_type == "even":
            internal_bin_walls = [ms1_range[0][0]]
            for window_index in range(0, num_windows):
                internal_bin_walls.append(
                    ms1_range[0][0] + ((window_index + 1) / num_windows) * ms1_range_difference
                )
            internal_bin_walls[0] = internal_bin_walls[0] - range_slack * ms1_range_difference
            internal_bin_walls[-1] = internal_bin_walls[-1] + range_slack * ms1_range_difference
            internal_bin_walls_extra = None
            if extra_bins > 0:
                internal_bin_walls_extra = [ms1_range[0][0]]
                for window_index in range(0, num_windows * (2**extra_bins)):
                    internal_bin_walls_extra.append(
                        ms1_range[0][0]
                        + ((window_index + 1) / (num_windows * (2**extra_bins)))
                        * ms1_range_difference
                    )
                internal_bin_walls_extra[0] = (
                    internal_bin_walls_extra[0] - range_slack * ms1_range_difference
                )
                internal_bin_walls_extra[-1] = (
                    internal_bin_walls_extra[-1] + range_slack * ms1_range_difference
                )
        elif window_type == "percentile":
            internal_bin_walls = np.percentile(
                ms1_mzs, np.arange(0, 100 + 100 / num_windows, 100 / num_windows)
            ).tolist()
            internal_bin_walls[0] = internal_bin_walls[0] - range_slack * ms1_range_difference
            internal_bin_walls[-1] = internal_bin_walls[-1] + range_slack * ms1_range_difference
            internal_bin_walls_extra = None
            if extra_bins > 0:
                internal_bin_walls_extra = np.percentile(
                    ms1_mzs,
                    np.arange(
                        0,
                        100 + 100 / (num_windows * (2**extra_bins)),
                        100 / (num_windows * (2**extra_bins)),
                    ),
                ).tolist()
                internal_bin_walls_extra[0] = (
                    internal_bin_walls_extra[0] - range_slack * ms1_range_difference
                )
                internal_bin_walls_extra[-1] = (
                    internal_bin_walls_extra[-1] + range_slack * ms1_range_difference
                )
        else:
            raise ValueError("Incorrect window_type selected. Must be 'even' or 'percentile'.")
            # convert bin walls and extra bin walls into locations to scan
        if dia_design == "basic":
            self.locations = []
            for window_index in range(0, num_windows):
                self.locations.append(
                    [[(internal_bin_walls[window_index], internal_bin_walls[window_index + 1])]]
                )
        elif dia_design == "kaufmann":
            self.locations = KaufmannWindows(
                internal_bin_walls, internal_bin_walls_extra, kaufmann_design, extra_bins
            ).locations
        else:
            raise ValueError("Incorrect dia_design selected. Must be 'basic' or 'kaufmann'.")

DsDAController

Bases: WrapperController

A controller which allows running the DsDA (Dataset-Dependent Acquisition) method.

See the original publication for a description of DsDA:

Broeckling, Hoyes, et al. "Comprehensive Tandem-Mass-Spectrometry coverage of complex samples enabled by Data-Set-Dependent acquisition." Analytical Chemistry. 90, 8020–8027 (2018).

Source code in vimms/Controller/misc.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class DsDAController(WrapperController):
    """
    A controller which allows running the DsDA  (Dataset-Dependent Acquisition)
    method.

    See the original publication for a description of DsDA:

    Broeckling, Hoyes, et al. "Comprehensive Tandem-Mass-Spectrometry coverage
    of complex samples enabled by Data-Set-Dependent acquisition."
    Analytical Chemistry. 90, 8020–8027 (2018).
    """

    def __init__(self, dsda_state, mzml_name, advanced_params=None, task_filter=None):
        """
        Initialise a new DsDAController instance.

        Args:
            dsda_state: An instance of [vimms.DsDA.DsDAState][], wrapping a
            live R process running DsDA.
            mzml_name: The name of the .mzML file to write for this injection.
            advanced_params: a [vimms.Controller.base.AdvancedParams][] object
                that contains advanced parameters to control the mass spec.
                See [vimms.Controller.base.AdvancedParams][] for defaults.
            task_filter: Object that examines the task list and adds or deletes
                tasks to ensure schedule remains in sync with the actual RT.
        """
        self.dsda_state = dsda_state
        self.mzml_name = mzml_name
        self.task_filter = task_filter

        if dsda_state.file_num == 0:
            self.controller = self.dsda_state.get_base_controller()
        else:
            schedule_params, rts = self.dsda_state.get_scan_params()
            self.controller = FixedScansController(
                schedule=schedule_params,
                advanced_params=advanced_params,
                expected_rts=rts,
                task_filter=task_filter,
            )

        print(self.controller)
        super().__init__()

    def after_injection_cleanup(self):
        self.dsda_state.register_mzml(self.mzml_name)

__init__(dsda_state, mzml_name, advanced_params=None, task_filter=None)

Initialise a new DsDAController instance.

Parameters:
  • dsda_state

    An instance of [vimms.DsDA.DsDAState][], wrapping a

  • mzml_name

    The name of the .mzML file to write for this injection.

  • advanced_params

    a vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. See vimms.Controller.base.AdvancedParams for defaults.

  • task_filter

    Object that examines the task list and adds or deletes tasks to ensure schedule remains in sync with the actual RT.

Source code in vimms/Controller/misc.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def __init__(self, dsda_state, mzml_name, advanced_params=None, task_filter=None):
    """
    Initialise a new DsDAController instance.

    Args:
        dsda_state: An instance of [vimms.DsDA.DsDAState][], wrapping a
        live R process running DsDA.
        mzml_name: The name of the .mzML file to write for this injection.
        advanced_params: a [vimms.Controller.base.AdvancedParams][] object
            that contains advanced parameters to control the mass spec.
            See [vimms.Controller.base.AdvancedParams][] for defaults.
        task_filter: Object that examines the task list and adds or deletes
            tasks to ensure schedule remains in sync with the actual RT.
    """
    self.dsda_state = dsda_state
    self.mzml_name = mzml_name
    self.task_filter = task_filter

    if dsda_state.file_num == 0:
        self.controller = self.dsda_state.get_base_controller()
    else:
        schedule_params, rts = self.dsda_state.get_scan_params()
        self.controller = FixedScansController(
            schedule=schedule_params,
            advanced_params=advanced_params,
            expected_rts=rts,
            task_filter=task_filter,
        )

    print(self.controller)
    super().__init__()

FixedScansController

Bases: Controller

A controller which takes a schedule of scans, and converts them into tasks in a queue.

The base class for pre-scheduled controllers like vimms.Controller.misc.DsDAController and vimms.Controller.misc.MatchingController.

Source code in vimms/Controller/misc.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class FixedScansController(Controller):
    """
    A controller which takes a schedule of scans, and converts them into
    tasks in a queue.

    The base class for pre-scheduled controllers like
    [vimms.Controller.misc.DsDAController][] and
    [vimms.Controller.misc.MatchingController][].
    """

    def __init__(self, schedule=None, advanced_params=None, expected_rts=None, task_filter=None):
        """
        Create a FixedScansController.

        Args:
            schedule: List of [vimms.Common.ScanParameter][] objects.
            advanced_params: Instance of [vimms.Controller.base.AdvancedParams][].
            expected_rts: List of expected RTs for tasks with indices corresponding
                to schedule. Only needed if e.g. resynchronising RTs using
                task_filter.
            task_filter: Object that dynamically returns tasks on request. For
                example [vimms.Controller.misc.TaskFilter][] can be used to
                resynchronise unexpected RTs with the expected ones in the schedule.
        """
        super().__init__(advanced_params=advanced_params)
        self.tasks = None
        self.initial_task = None
        self.task_idx = 1  # First scan of schedule is always run
        self.expected_rts = expected_rts
        self.task_filter = task_filter

        if schedule is not None and len(schedule) > 0:
            # if schedule is provided, set it
            self.set_tasks(schedule)
            self.scan_id = self.initial_task.get(ScanParameters.SCAN_ID) + 1
            self.precursor_id = (
                self.scan_id - 1 if self.initial_task.get(ScanParameters.MS_LEVEL) == 1 else None
            )

    def get_initial_tasks(self):
        """
        Returns all initial tasks for the mass spec queue.

        Returns: List of tasks.
        """
        # the remaining scan parameters in the schedule must have been set
        assert self.tasks is not None
        if self.task_filter is None:
            return self.tasks[1:]
        else:
            return []

    def get_initial_scan_params(self):
        """
        Returns the initial scan parameter object to send when
        acquisition starts

        Returns: The initial task.
        """
        # the first scan parameters in the schedule must have been set
        assert self.initial_task is not None
        return self.initial_task

    def set_tasks(self, schedule):
        """
        Set a new schedule for this controller.

        Args:
            schedule: A list of [vimms.Common.ScanParameter][].
        """
        assert isinstance(schedule, list)
        self.initial_task = schedule[0]  # used for sending the first scan
        self.tasks = schedule  # used for sending all the other scans

    def handle_scan(self, scan, current_size, pending_size):
        # simply record every scan that we've received, but return no new tasks
        logger.debug("Time %f Received %s" % (scan.rt, scan))
        self.scans[scan.ms_level].append(scan)
        return self._process_scan(scan)

    def update_state_after_scan(self, last_scan):
        pass

    def _process_scan(self, scan):
        if self.task_filter is None:
            return []
        else:
            self.task_idx, new_task = self.task_filter.get_task(
                scan, self.scan_id, self.precursor_id, self.task_idx, self.expected_rts, self.tasks
            )
            if new_task.get(ScanParameters.MS_LEVEL) == 1:
                self.precursor_id = self.scan_id
            self.scan_id += 1
            return [new_task]

__init__(schedule=None, advanced_params=None, expected_rts=None, task_filter=None)

Create a FixedScansController.

Parameters:
  • schedule

    List of [vimms.Common.ScanParameter][] objects.

  • advanced_params
  • expected_rts

    List of expected RTs for tasks with indices corresponding to schedule. Only needed if e.g. resynchronising RTs using task_filter.

  • task_filter

    Object that dynamically returns tasks on request. For example vimms.Controller.misc.TaskFilter can be used to resynchronise unexpected RTs with the expected ones in the schedule.

Source code in vimms/Controller/misc.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def __init__(self, schedule=None, advanced_params=None, expected_rts=None, task_filter=None):
    """
    Create a FixedScansController.

    Args:
        schedule: List of [vimms.Common.ScanParameter][] objects.
        advanced_params: Instance of [vimms.Controller.base.AdvancedParams][].
        expected_rts: List of expected RTs for tasks with indices corresponding
            to schedule. Only needed if e.g. resynchronising RTs using
            task_filter.
        task_filter: Object that dynamically returns tasks on request. For
            example [vimms.Controller.misc.TaskFilter][] can be used to
            resynchronise unexpected RTs with the expected ones in the schedule.
    """
    super().__init__(advanced_params=advanced_params)
    self.tasks = None
    self.initial_task = None
    self.task_idx = 1  # First scan of schedule is always run
    self.expected_rts = expected_rts
    self.task_filter = task_filter

    if schedule is not None and len(schedule) > 0:
        # if schedule is provided, set it
        self.set_tasks(schedule)
        self.scan_id = self.initial_task.get(ScanParameters.SCAN_ID) + 1
        self.precursor_id = (
            self.scan_id - 1 if self.initial_task.get(ScanParameters.MS_LEVEL) == 1 else None
        )

get_initial_scan_params()

Returns the initial scan parameter object to send when acquisition starts

Returns: The initial task.

Source code in vimms/Controller/misc.py
224
225
226
227
228
229
230
231
232
233
def get_initial_scan_params(self):
    """
    Returns the initial scan parameter object to send when
    acquisition starts

    Returns: The initial task.
    """
    # the first scan parameters in the schedule must have been set
    assert self.initial_task is not None
    return self.initial_task

get_initial_tasks()

Returns all initial tasks for the mass spec queue.

Returns: List of tasks.

Source code in vimms/Controller/misc.py
211
212
213
214
215
216
217
218
219
220
221
222
def get_initial_tasks(self):
    """
    Returns all initial tasks for the mass spec queue.

    Returns: List of tasks.
    """
    # the remaining scan parameters in the schedule must have been set
    assert self.tasks is not None
    if self.task_filter is None:
        return self.tasks[1:]
    else:
        return []

set_tasks(schedule)

Set a new schedule for this controller.

Parameters:
  • schedule

    A list of [vimms.Common.ScanParameter][].

Source code in vimms/Controller/misc.py
235
236
237
238
239
240
241
242
243
244
def set_tasks(self, schedule):
    """
    Set a new schedule for this controller.

    Args:
        schedule: A list of [vimms.Common.ScanParameter][].
    """
    assert isinstance(schedule, list)
    self.initial_task = schedule[0]  # used for sending the first scan
    self.tasks = schedule  # used for sending all the other scans

FlexibleNonOverlapController

Bases: TopNEXtController

TODO: this class can probably be removed.

Source code in vimms/Controller/box.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class FlexibleNonOverlapController(TopNEXtController):
    """
    TODO: this class can probably be removed.
    """

    def __init__(
        self,
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        grid,
        smartroi_params=None,
        min_roi_length_for_fragmentation=1,
        ms1_shift=0,
        advanced_params=None,
        register_all_roi=False,
        scoring_params=GRID_CONTROLLER_SCORING_PARAMS,
        exclusion_method=ROI_EXCLUSION_DEW,
        exclusion_t_0=None,
    ):  # weighted dew parameters
        super().__init__(
            ionisation_mode,
            isolation_width,
            N,
            mz_tol,
            rt_tol,
            min_ms1_intensity,
            roi_params,
            grid,
            smartroi_params=smartroi_params,
            min_roi_length_for_fragmentation=min_roi_length_for_fragmentation,
            ms1_shift=ms1_shift,
            advanced_params=advanced_params,
            register_all_roi=register_all_roi,
            scoring_params=scoring_params,
            exclusion_method=exclusion_method,
            exclusion_t_0=exclusion_t_0,
        )
        self.scoring_params = scoring_params
        if self.scoring_params["theta3"] != 0 and self.register_all_roi is False:
            print("Warning: register_all_roi should be set to True id theta3 is not 0")

    def _overlap_scores(self):
        scores = [
            self.grid.flexible_non_overlap(
                r, self.roi_builder.current_roi_intensities[i], self.scoring_params
            )
            for i, r in enumerate(self.roi_builder.live_roi)
        ]
        return scores

GenericBox

Bases: Box

Makes no particular assumptions about bounding boxes.

Source code in vimms/Box.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
class GenericBox(Box):
    """Makes no particular assumptions about bounding boxes."""

    def __repr__(self):
        return "Generic{}".format(super().__repr__())

    def contains_point(self, pt):
        return (
            self.pt1.x <= pt.x and self.pt1.y <= pt.y and self.pt2.x >= pt.x and self.pt2.y >= pt.y
        )

    def interval_contains(self, inv):
        if inv.is_vertical():
            return (
                self.pt1.x <= inv.pt1.x
                and self.pt2.x >= inv.pt1.x
                and self.pt1.y >= inv.pt1.y
                and self.pt2.y <= inv.pt2.y
            )
        else:
            return (
                self.pt1.x >= inv.pt1.x
                and self.pt2.x <= inv.pt2.x
                and self.pt1.y <= inv.pt1.y
                and self.pt2.y >= inv.pt1.y
            )

    def overlaps_with_box(self, other_box):
        return (
            self.pt1.x < other_box.pt2.x
            and self.pt2.x > other_box.pt1.x
            and self.pt1.y < other_box.pt2.y
            and self.pt2.y > other_box.pt1.y
        )

    def contains_box(self, other_box):
        return (
            self.pt1.x <= other_box.pt1.x
            and self.pt1.y <= other_box.pt1.y
            and self.pt2.x >= other_box.pt2.x
            and self.pt2.y >= other_box.pt2.y
        )

    def combine_max(self, other_box):
        return GenericBox(
            min(self.pt1.x, other_box.pt1.x),
            max(self.pt2.x, other_box.pt2.x),
            min(self.pt1.y, other_box.pt1.y),
            max(self.pt2.y, other_box.pt2.y),
            parents=(self.parents + other_box.parents),
            intensity=max(self.intensity, other_box.intensity),
        )

    def apply_min_box_ppm(self, xwidth=None, ywidth=None, round_digits=8):
        x1, y1 = self.pt1
        x2, y2 = self.pt2

        if xwidth is not None:
            mid = (x1 + x2) / 2
            dist = (xwidth / 1e6) * mid
            if dist > x2 - x1:
                x1 = mid - dist / 2
                x2 = mid + dist / 2

        if ywidth is not None:
            mid = (y1 + y2) / 2
            dist = (ywidth / 1e6) * mid
            if dist > y2 - y1:
                y1 = mid - dist / 2
                y2 = mid + dist / 2

        new_box = self.copy()
        new_box.pt1.x, new_box.pt1.y = x1, y1
        new_box.pt2.x, new_box.pt2.y = x2, y2
        new_box.round(ndigits=round_digits)
        return new_box

    def overlap_raw(self, other_box):
        if not self.overlaps_with_box(other_box):
            return 0.0
        return (min(self.pt2.x, other_box.pt2.x) - max(self.pt1.x, other_box.pt1.x)) * (
            min(self.pt2.y, other_box.pt2.y) - max(self.pt1.y, other_box.pt1.y)
        )

    def overlap_2(self, other_box):
        if not self.overlaps_with_box(other_box):
            return 0.0
        b = type(self)(
            max(self.pt1.x, other_box.pt1.x),
            min(self.pt2.x, other_box.pt2.x),
            max(self.pt1.y, other_box.pt1.y),
            min(self.pt2.y, other_box.pt2.y),
        )
        return b.area() / (self.area() + other_box.area() - b.area())

    def overlap_3(self, other_box):
        if not self.overlaps_with_box(other_box):
            return 0.0
        b = type(self)(
            max(self.pt1.x, other_box.pt1.x),
            min(self.pt2.x, other_box.pt2.x),
            max(self.pt1.y, other_box.pt1.y),
            min(self.pt2.y, other_box.pt2.y),
        )
        return b.area() / self.area()

    def non_overlap_split(self, other_box):
        """Finds 1 to 4 boxes describing the polygon of area of this box
        not overlapped by other_box. If one box is found, crops this box to
        dimensions of that box, and returns None. Otherwise, returns list of
        2 to 4 boxes. Number of boxes found is equal to number of edges
        overlapping area does NOT share with this box."""
        if not self.overlaps_with_box(other_box):
            return None
        x1, x2, y1, y2 = self.pt1.x, self.pt2.x, self.pt1.y, self.pt2.y
        split_boxes = []
        if other_box.pt1.x > self.pt1.x:
            x1 = other_box.pt1.x
            split_boxes.append(
                type(self)(self.pt1.x, x1, y1, y2, parents=self.parents, intensity=self.intensity)
            )
        if other_box.pt2.x < self.pt2.x:
            x2 = other_box.pt2.x
            split_boxes.append(
                type(self)(x2, self.pt2.x, y1, y2, parents=self.parents, intensity=self.intensity)
            )
        if other_box.pt1.y > self.pt1.y:
            y1 = other_box.pt1.y
            split_boxes.append(
                type(self)(x1, x2, self.pt1.y, y1, parents=self.parents, intensity=self.intensity)
            )
        if other_box.pt2.y < self.pt2.y:
            y2 = other_box.pt2.y
            split_boxes.append(
                type(self)(x1, x2, y2, self.pt2.y, parents=self.parents, intensity=self.intensity)
            )
        return split_boxes

    def split_all(self, other_box):
        if not self.overlaps_with_box(other_box):
            return None, None, None
        both_parents = self.parents + other_box.parents
        both_box = type(self)(
            max(self.pt1.x, other_box.pt1.x),
            min(self.pt2.x, other_box.pt2.x),
            max(self.pt1.y, other_box.pt1.y),
            min(self.pt2.y, other_box.pt2.y),
            parents=both_parents,
            intensity=max(self.intensity, other_box.intensity),
        )
        b1_boxes = self.non_overlap_split(other_box)
        b2_boxes = other_box.non_overlap_split(self)
        return b1_boxes, b2_boxes, both_box

non_overlap_split(other_box)

Finds 1 to 4 boxes describing the polygon of area of this box not overlapped by other_box. If one box is found, crops this box to dimensions of that box, and returns None. Otherwise, returns list of 2 to 4 boxes. Number of boxes found is equal to number of edges overlapping area does NOT share with this box.

Source code in vimms/Box.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def non_overlap_split(self, other_box):
    """Finds 1 to 4 boxes describing the polygon of area of this box
    not overlapped by other_box. If one box is found, crops this box to
    dimensions of that box, and returns None. Otherwise, returns list of
    2 to 4 boxes. Number of boxes found is equal to number of edges
    overlapping area does NOT share with this box."""
    if not self.overlaps_with_box(other_box):
        return None
    x1, x2, y1, y2 = self.pt1.x, self.pt2.x, self.pt1.y, self.pt2.y
    split_boxes = []
    if other_box.pt1.x > self.pt1.x:
        x1 = other_box.pt1.x
        split_boxes.append(
            type(self)(self.pt1.x, x1, y1, y2, parents=self.parents, intensity=self.intensity)
        )
    if other_box.pt2.x < self.pt2.x:
        x2 = other_box.pt2.x
        split_boxes.append(
            type(self)(x2, self.pt2.x, y1, y2, parents=self.parents, intensity=self.intensity)
        )
    if other_box.pt1.y > self.pt1.y:
        y1 = other_box.pt1.y
        split_boxes.append(
            type(self)(x1, x2, self.pt1.y, y1, parents=self.parents, intensity=self.intensity)
        )
    if other_box.pt2.y < self.pt2.y:
        y2 = other_box.pt2.y
        split_boxes.append(
            type(self)(x1, x2, y2, self.pt2.y, parents=self.parents, intensity=self.intensity)
        )
    return split_boxes

IdleController

Bases: Controller

A controller that doesn't do any controlling. Mostly used as a skeleton code to illustrate the code structure in ViMMS controllers.

Source code in vimms/Controller/fullscan.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class IdleController(Controller):
    """
    A controller that doesn't do any controlling.
    Mostly used as a skeleton code to illustrate the code structure in ViMMS controllers.
    """

    def __init__(self, advanced_params=None):
        """
        Initialise an idle controller
        Args:
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
        """
        super().__init__(advanced_params=advanced_params)

    def _process_scan(self, scan):
        new_tasks = []
        return new_tasks

    def update_state_after_scan(self, last_scan):
        pass

    def reset(self):
        pass

__init__(advanced_params=None)

Initialise an idle controller Args: advanced_params: an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

Source code in vimms/Controller/fullscan.py
15
16
17
18
19
20
21
22
23
def __init__(self, advanced_params=None):
    """
    Initialise an idle controller
    Args:
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
    """
    super().__init__(advanced_params=advanced_params)

IntensityNonOverlapController

Bases: IntensityTopNEXtController

A variant of the non-overlap controller but it takes into account intensity changes.

Source code in vimms/Controller/box.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class IntensityNonOverlapController(IntensityTopNEXtController):
    """
    A variant of the non-overlap controller but it takes into account intensity changes.
    """

    def _overlap_scores(self):
        new_intensities = np.log(
            [
                self.grid.intensity_non_overlap(
                    r, self.roi_builder.current_roi_intensities[i], self.scoring_params
                )
                for i, r in enumerate(self.roi_builder.live_roi)
            ]
        )
        return new_intensities

LengthFilter

Bases: ScoreFilter

A class that implements a check on minimum length of ROI for fragmentation

Source code in vimms/Exclusion.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
class LengthFilter(ScoreFilter):
    """
    A class that implements a check on minimum length of ROI for fragmentation
    """

    def __init__(self, min_roi_length_for_fragmentation):
        """
        Initialise a length filter

        Args:
            min_roi_length_for_fragmentation: the minimum length of ROI for fragmentation
        """
        self.min_roi_length_for_fragmentation = min_roi_length_for_fragmentation

    def filter(self, roi_lengths):
        """
        Check that ROI lengths are above the threshold
        Args:
            roi_lengths: a numpy array of ROI lengths

        Returns: an array of indicator whether the lengths are above threshold

        """
        return roi_lengths >= self.min_roi_length_for_fragmentation

__init__(min_roi_length_for_fragmentation)

Initialise a length filter

Parameters:
  • min_roi_length_for_fragmentation

    the minimum length of ROI for fragmentation

Source code in vimms/Exclusion.py
554
555
556
557
558
559
560
561
def __init__(self, min_roi_length_for_fragmentation):
    """
    Initialise a length filter

    Args:
        min_roi_length_for_fragmentation: the minimum length of ROI for fragmentation
    """
    self.min_roi_length_for_fragmentation = min_roi_length_for_fragmentation

filter(roi_lengths)

Check that ROI lengths are above the threshold Args: roi_lengths: a numpy array of ROI lengths

Returns: an array of indicator whether the lengths are above threshold

Source code in vimms/Exclusion.py
563
564
565
566
567
568
569
570
571
572
def filter(self, roi_lengths):
    """
    Check that ROI lengths are above the threshold
    Args:
        roi_lengths: a numpy array of ROI lengths

    Returns: an array of indicator whether the lengths are above threshold

    """
    return roi_lengths >= self.min_roi_length_for_fragmentation

MS2PlannerController

Bases: FixedScansController

A controller that interfaces with MS2Planner, as described in:

Zuo, Zeyuan, et al. "MS2Planner: improved fragmentation spectra coverage in untargeted mass spectrometry by iterative optimized data acquisition." Bioinformatics 37.Supplement_1 (2021): i231-i236.

Source code in vimms/Controller/misc.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
class MS2PlannerController(FixedScansController):
    """
    A controller that interfaces with MS2Planner, as described in:

    Zuo, Zeyuan, et al. "MS2Planner: improved fragmentation spectra coverage in
    untargeted mass spectrometry by iterative optimized data acquisition."
    Bioinformatics 37.Supplement_1 (2021): i231-i236.
    """

    @staticmethod
    def boxfile2ms2planner(reader, inpath, outpath):
        """
        Transform peak-picked box file to ms2planner default format.

        Args:
            inpath: Path to input box file.
            outpath: Path to output file used in MS2Planner input.

        Returns: None
        """

        out_headers = ["Mass [m/z]", "retention_time", "charge", "Blank", "Sample"]

        records = []
        fs_names, line_ls = reader.read_aligned_csv(inpath)
        for i, (row_fields, mzml_fields) in enumerate(line_ls):
            if len(list(mzml_fields.keys())) > 1:
                raise NotImplementedError(
                    "MS2Planner controller doesn't currently handle aligned experiment"
                )
            # not sure if it even makes sense to try and use an aligned file with
            # MS2Planner
            # but handle the file as if it was aligned in case this
            # more general code will be useful later
            statuses = ((mzml, inner["status"].upper()) for mzml, inner in mzml_fields.items())
            mzmls = [mzml for mzml, s in statuses if s == "DETECTED" or s == "ESTIMATED"]
            if mzmls != []:
                records.append(
                    [
                        row_fields["row m/z"],
                        float(row_fields["row retention time"]) * 60,
                        mzml_fields[mzmls[0]]["charge"],
                        0.0,
                        mean(float(mzml_fields[mzml]["height"]) for mzml in mzmls),
                    ]
                )

        records.sort(key=lambda r: r[1])

        with open(outpath, "w+") as f:
            f.write(",".join(out_headers) + "\n")
            for r in records:
                f.write(",".join(str(field) for field in r) + "\n")

    @staticmethod
    def mzmine2ms2planner(inpath, outpath):
        """
        Transform MZMine2 box file to ms2planner default format.

        Args:
            inpath: Path to input MZMine2 file.
            outpath: Path to output file used in MS2Planner input.

        Returns: None

        """

        return MS2PlannerController.boxfile2ms2planner(MZMineParams, inpath, outpath)

    @staticmethod
    def minimise_single(x, target):
        if target < 0:
            return 0
        c = int(target // x)
        return min(c, c + 1, key=lambda c: abs(target - c * x))

    @staticmethod
    def minimise_distance(target, *args):
        """
        Solve argmin(a1, a2 ... an)(a1x1 + ... + anxn - t) for
        non-negative integer a1...an and non-negative reals x1...xn, t
        using backtracking search. i.e. Schedule tasks of different fixed
        lengths s.t. the last task ends as close to the target time
        as possible.

        Args:
            target:
            *args:

        Returns: the best coefficients

        """
        best_coefficients = (float("inf"), [])
        stack = [MS2PlannerController.minimise_single(args[0], target)] if len(args) > 0 else []
        while stack != []:
            remainder = target - sum(s * a for s, a in zip(stack, args))
            for i in range(len(stack), len(args)):
                c = MS2PlannerController.minimise_single(args[i], remainder)
                stack.append(c)
                remainder -= c * args[i]
            dist = abs(remainder)
            if not math.isclose(dist, best_coefficients[0]) and dist < best_coefficients[0]:
                best_coefficients = (dist, copy.copy(stack))
            stack.pop()
            while stack != [] and stack[-1] <= 0:
                stack.pop()
            if stack != []:
                stack[-1] -= 1
        return best_coefficients[1]

    @staticmethod
    def parse_ms2planner(fpaths):
        fields = [
            "mz_centre",
            "mz_isolation",
            "duration",
            "rt_start",
            "rt_end",
            "intensity",
            "apex_rt",
            "charge",
        ]

        schedules = []
        for fpath in fpaths:
            schedule = []
            with open(fpath, "r") as f:
                f.readline()
                for ln in f:
                    schedule.append(dict(zip(fields, (float(x) for x in ln.split(",")))))
            schedules.append(schedule)
        return schedules

    @staticmethod
    def sched_dict2params(schedule, scan_duration_dict):
        """
        Scan_duration_dict matches the format of MS scan_duration_dict
        with _fixed_ scan lengths.

        Args:
            schedule:
            scan_duration_dict:

        Returns: new schedule

        """
        time = scan_duration_dict[1]
        new_sched = [get_default_scan_params(scan_id=INITIAL_SCAN_ID)]
        precursor_id = INITIAL_SCAN_ID
        id_count = INITIAL_SCAN_ID + 1

        srted = sorted(schedule, key=lambda s: s["rt_start"])
        print("Schedule times: {}".format([s["rt_start"] for s in srted]))
        print(f"NUM SCANS IN SCHEDULE FILE: {len(schedule)}")
        for ms2 in srted:

            if ms2["rt_start"] - time < scan_duration_dict[1]:
                target = ms2["rt_start"] - time
            else:
                target = ms2["rt_start"] - scan_duration_dict[1] - time

            num_ms1, num_ms2 = MS2PlannerController.minimise_distance(
                target, scan_duration_dict[1], scan_duration_dict[2]
            )

            if ms2["rt_start"] - time >= scan_duration_dict[1]:
                num_ms1 += 1
            num_ms2 += 1  # add the actual scan

            print(f"num_scans: {(num_ms1, num_ms2)}")

            filler_diff = num_ms1 - num_ms2
            fillers = [1 if filler_diff > 0 else 2 for i in range(abs(filler_diff))]
            fillers.extend([1, 2] * min(num_ms1, num_ms2))

            for ms_level in fillers:
                # print(f"sid: {id_count}")
                if ms_level == 1:
                    precursor_id = id_count
                    new_sched.append(get_default_scan_params(scan_id=precursor_id))
                else:
                    new_sched.append(
                        get_dda_scan_param(
                            ms2["mz_centre"],
                            0.0,
                            precursor_id,
                            ms2["mz_isolation"],
                            0.0,
                            0.0,
                            scan_id=id_count,
                        )
                    )
                id_count += 1

            times = [time, scan_duration_dict[1] * num_ms1, scan_duration_dict[2] * num_ms2]
            time = sum(times)

            print(
                f"Start time: {times[0]}, MS1 duration: {times[1]}, "
                f"MS2 duration: {times[2]}, End time: {time}"
            )
            print(f"schedule_length: {len(new_sched)}")
        print(f"Durations: {scan_duration_dict}")

        return new_sched

    @staticmethod
    def from_fullscan(
        ms2planner_dir,
        fullscan_mzmine_table,
        out_file,
        intensity_threshold,
        intensity_ratio,
        num_injections,
        intensity_accu,
        isolation,
        delay,
        min_scan_len,
        max_scan_len,
        scan_duration_dict,
        mode="apex",
        fullscan_file=None,
        restriction=None,
        cluster_method="kNN",
        userpython="python",
        advanced_params=None,
    ):

        converted = os.path.join(os.path.dirname(out_file), "mzmine2ms2planner.txt")
        MS2PlannerController.mzmine2ms2planner(fullscan_mzmine_table, converted)

        process_args = [
            userpython,
            os.path.join(ms2planner_dir, "path_finder.py"),
            mode,
            converted,
            out_file,
            str(intensity_threshold),
            str(intensity_ratio),
            str(num_injections),
            "-intensity_accu",
            str(intensity_accu),
            "-isolation",
            str(isolation),
            "-delay",
            str(delay),
            "-min_scan",
            str(min_scan_len),
            "-max_scan",
            str(max_scan_len),
        ]

        if mode.lower() == "curve":
            if fullscan_file is None or restriction is None:
                raise ValueError(
                    """fullscan_file and restriction arguments must be
                       supplied for curve mode!"""
                )

            process_args.extend(
                [
                    "-infile_raw",
                    str(fullscan_file),
                    "-restriction",
                    str(restriction[0]),
                    str(restriction[1]),
                    "-cluster",
                    str(cluster_method),
                ]
            )

        elif mode.lower() != "apex":
            raise ValueError("Only curve and apex are supported as modes!")

        subprocess.run(process_args)
        out_files = [
            f"{'.'.join(out_file.split('.')[:-1])}_{mode.lower()}_path_{i+1}.csv"
            for i in range(num_injections)
        ]
        schedules = [
            MS2PlannerController.sched_dict2params(sch, scan_duration_dict)
            for sch in MS2PlannerController.parse_ms2planner(out_files)
        ]
        with open(os.path.join(os.path.dirname(out_file), "scan_params.txt"), "w+") as f:
            for i, schedule in enumerate(schedules):
                f.write(f"SCHEDULE {i}\n\n")
                f.write("".join(f"SCAN {j}: {scan}\n\n" for j, scan in enumerate(schedule)))
        return [
            MS2PlannerController(schedule=schedule, advanced_params=advanced_params)
            for schedule in schedules
        ]

boxfile2ms2planner(reader, inpath, outpath) staticmethod

Transform peak-picked box file to ms2planner default format.

Parameters:
  • inpath

    Path to input box file.

  • outpath

    Path to output file used in MS2Planner input.

Returns: None

Source code in vimms/Controller/misc.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
@staticmethod
def boxfile2ms2planner(reader, inpath, outpath):
    """
    Transform peak-picked box file to ms2planner default format.

    Args:
        inpath: Path to input box file.
        outpath: Path to output file used in MS2Planner input.

    Returns: None
    """

    out_headers = ["Mass [m/z]", "retention_time", "charge", "Blank", "Sample"]

    records = []
    fs_names, line_ls = reader.read_aligned_csv(inpath)
    for i, (row_fields, mzml_fields) in enumerate(line_ls):
        if len(list(mzml_fields.keys())) > 1:
            raise NotImplementedError(
                "MS2Planner controller doesn't currently handle aligned experiment"
            )
        # not sure if it even makes sense to try and use an aligned file with
        # MS2Planner
        # but handle the file as if it was aligned in case this
        # more general code will be useful later
        statuses = ((mzml, inner["status"].upper()) for mzml, inner in mzml_fields.items())
        mzmls = [mzml for mzml, s in statuses if s == "DETECTED" or s == "ESTIMATED"]
        if mzmls != []:
            records.append(
                [
                    row_fields["row m/z"],
                    float(row_fields["row retention time"]) * 60,
                    mzml_fields[mzmls[0]]["charge"],
                    0.0,
                    mean(float(mzml_fields[mzml]["height"]) for mzml in mzmls),
                ]
            )

    records.sort(key=lambda r: r[1])

    with open(outpath, "w+") as f:
        f.write(",".join(out_headers) + "\n")
        for r in records:
            f.write(",".join(str(field) for field in r) + "\n")

minimise_distance(target, *args) staticmethod

Solve argmin(a1, a2 ... an)(a1x1 + ... + anxn - t) for non-negative integer a1...an and non-negative reals x1...xn, t using backtracking search. i.e. Schedule tasks of different fixed lengths s.t. the last task ends as close to the target time as possible.

Parameters:
  • target
  • *args

Returns: the best coefficients

Source code in vimms/Controller/misc.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
@staticmethod
def minimise_distance(target, *args):
    """
    Solve argmin(a1, a2 ... an)(a1x1 + ... + anxn - t) for
    non-negative integer a1...an and non-negative reals x1...xn, t
    using backtracking search. i.e. Schedule tasks of different fixed
    lengths s.t. the last task ends as close to the target time
    as possible.

    Args:
        target:
        *args:

    Returns: the best coefficients

    """
    best_coefficients = (float("inf"), [])
    stack = [MS2PlannerController.minimise_single(args[0], target)] if len(args) > 0 else []
    while stack != []:
        remainder = target - sum(s * a for s, a in zip(stack, args))
        for i in range(len(stack), len(args)):
            c = MS2PlannerController.minimise_single(args[i], remainder)
            stack.append(c)
            remainder -= c * args[i]
        dist = abs(remainder)
        if not math.isclose(dist, best_coefficients[0]) and dist < best_coefficients[0]:
            best_coefficients = (dist, copy.copy(stack))
        stack.pop()
        while stack != [] and stack[-1] <= 0:
            stack.pop()
        if stack != []:
            stack[-1] -= 1
    return best_coefficients[1]

mzmine2ms2planner(inpath, outpath) staticmethod

Transform MZMine2 box file to ms2planner default format.

Parameters:
  • inpath

    Path to input MZMine2 file.

  • outpath

    Path to output file used in MS2Planner input.

Returns: None

Source code in vimms/Controller/misc.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@staticmethod
def mzmine2ms2planner(inpath, outpath):
    """
    Transform MZMine2 box file to ms2planner default format.

    Args:
        inpath: Path to input MZMine2 file.
        outpath: Path to output file used in MS2Planner input.

    Returns: None

    """

    return MS2PlannerController.boxfile2ms2planner(MZMineParams, inpath, outpath)

sched_dict2params(schedule, scan_duration_dict) staticmethod

Scan_duration_dict matches the format of MS scan_duration_dict with fixed scan lengths.

Parameters:
  • schedule
  • scan_duration_dict

Returns: new schedule

Source code in vimms/Controller/misc.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
@staticmethod
def sched_dict2params(schedule, scan_duration_dict):
    """
    Scan_duration_dict matches the format of MS scan_duration_dict
    with _fixed_ scan lengths.

    Args:
        schedule:
        scan_duration_dict:

    Returns: new schedule

    """
    time = scan_duration_dict[1]
    new_sched = [get_default_scan_params(scan_id=INITIAL_SCAN_ID)]
    precursor_id = INITIAL_SCAN_ID
    id_count = INITIAL_SCAN_ID + 1

    srted = sorted(schedule, key=lambda s: s["rt_start"])
    print("Schedule times: {}".format([s["rt_start"] for s in srted]))
    print(f"NUM SCANS IN SCHEDULE FILE: {len(schedule)}")
    for ms2 in srted:

        if ms2["rt_start"] - time < scan_duration_dict[1]:
            target = ms2["rt_start"] - time
        else:
            target = ms2["rt_start"] - scan_duration_dict[1] - time

        num_ms1, num_ms2 = MS2PlannerController.minimise_distance(
            target, scan_duration_dict[1], scan_duration_dict[2]
        )

        if ms2["rt_start"] - time >= scan_duration_dict[1]:
            num_ms1 += 1
        num_ms2 += 1  # add the actual scan

        print(f"num_scans: {(num_ms1, num_ms2)}")

        filler_diff = num_ms1 - num_ms2
        fillers = [1 if filler_diff > 0 else 2 for i in range(abs(filler_diff))]
        fillers.extend([1, 2] * min(num_ms1, num_ms2))

        for ms_level in fillers:
            # print(f"sid: {id_count}")
            if ms_level == 1:
                precursor_id = id_count
                new_sched.append(get_default_scan_params(scan_id=precursor_id))
            else:
                new_sched.append(
                    get_dda_scan_param(
                        ms2["mz_centre"],
                        0.0,
                        precursor_id,
                        ms2["mz_isolation"],
                        0.0,
                        0.0,
                        scan_id=id_count,
                    )
                )
            id_count += 1

        times = [time, scan_duration_dict[1] * num_ms1, scan_duration_dict[2] * num_ms2]
        time = sum(times)

        print(
            f"Start time: {times[0]}, MS1 duration: {times[1]}, "
            f"MS2 duration: {times[2]}, End time: {time}"
        )
        print(f"schedule_length: {len(new_sched)}")
    print(f"Durations: {scan_duration_dict}")

    return new_sched

MZMineParams dataclass

Bases: AbstractParams

Wrapper class to run MZMine 2 peak-picking from the ViMMS codebase. MZMine 2 allows commands for its processing pipeline to be stored in an .xml and then run via command line using its "batch mode" executable. Given an appropriate "template" .xml this class will substitute input and output file names into it and then run it in batch mode via subprocess.

NOTE: MZMine is not installed with ViMMS. It must be installed separately and the path to the "batch mode" executable specified for this class.

Parameters:
  • mzmine_template (str) –

    Path to .xml template giving batch commands.

  • mzmine_exe (str) –

    Path to batch mode executable.

Source code in vimms/PeakPicking.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@dataclass
class MZMineParams(AbstractParams):
    """
    Wrapper class to run MZMine 2 peak-picking from the ViMMS codebase.
    MZMine 2 allows commands for its processing pipeline to be stored in an .xml
    and then run via command line using its "batch mode" executable. Given an
    appropriate "template" .xml this class will substitute input and output file
    names into it and then run it in batch mode via subprocess.

    NOTE: MZMine is not installed with ViMMS. It must be installed separately
    and the path to the "batch mode" executable specified for this class.

    Args:
        mzmine_template: Path to .xml template giving batch commands.
        mzmine_exe: Path to batch mode executable.
    """

    method_name = "MZMine"

    RT_FACTOR = 60  # minutes

    mzmine_template: str
    mzmine_exe: str

    def _make_batch_file(self, input_files, output_dir, output_name, output_path):
        et = ElementTree.parse(self.mzmine_template)
        root = et.getroot()
        for child in root:

            if child.attrib["method"].endswith("RawDataImportModule"):
                input_found = False
                for e in child:
                    if e.attrib["name"].strip().lower() == "raw data file names":
                        for f in e:
                            e.remove(f)
                        for i, fname in enumerate(input_files):
                            new = ElementTree.SubElement(e, "file")
                            new.text = os.path.abspath(fname)
                            padding = " " * (0 if i == len(input_files) - 1 else 8)
                            new.tail = e.tail + padding
                        input_found = True
                assert input_found, "Couldn't find a place to put the input files in the template!"

            if child.attrib["method"].endswith("CSVExportModule"):
                for e in child:
                    for f in e:
                        if f.tag == "current_file":
                            f.text = output_path

        new_xml = os.path.join(output_dir, f"{output_name}_template.xml")
        et.write(new_xml)
        return new_xml

    def pick_aligned_peaks(self, input_files, output_dir, output_name, force=False):
        """
        Run MZMine batch mode file for a list of input files.

        Args:
            input_files: Iterable of paths to input files.
            output_dir: Directory to write output to.
            output_name: Name for output file. Some text and the file extension
                are added automatically.
            force: When False, don't run peak-picking if a file already exists
                at the output destination.

        Returns: Full path the output file was written to.
        """
        input_files = list(set(input_files))  # filter duplicates
        output_path = self.format_output_path(output_dir, output_name)

        if not os.path.exists(output_path) or force:
            new_xml = self._make_batch_file(input_files, output_dir, output_name, output_path)
            print(f"Running MZMine for {output_path}")
            subprocess.run([self.mzmine_exe, new_xml])

        report_boxes("MZMine", output_path)
        return output_path

    @staticmethod
    def check_files_match(fullscan_names, aligned_path, mode="subset"):
        """
        Check that the source files listed in the header of a peak-picking
        output match an input list.

        Args:
            fullscan_names: List of .mzml files (or paths to them) to look
                for in the header of the aligned file.
            aligned_path: Full filepath to the aligned file.
            mode: "subset" just checks if all fullscan_names can be found in
                the header. "exact" checks whether or not the two sets of
                names exactly match.

        Returns: Tuple of boolean reporting whether test succeeded, the
            names of the fullscans given as input, and the names of files
            found in the header.
        """
        fs_names = {os.path.basename(fs) for fs in fullscan_names}
        mzmine_names = set()

        with open(aligned_path, "r") as f:
            headers = f.readline().split(",")
            pattern = re.compile(r"(.*\.mzML).*")

            for h in headers:
                for fs in fs_names:
                    m = pattern.match(h)
                    if m is not None:
                        mzmine_names.add(m.group(1))

        mode = mode.lower()
        if mode == "exact":
            passed = not fs_names ^ mzmine_names
        elif mode == "subset":
            passed = not fs_names - mzmine_names
        else:
            raise ValueError("Mode not recognised")

        return passed, fs_names, mzmine_names

    @staticmethod
    def read_aligned_csv(box_file_path):
        """
        Parse in an aligned boxfile in MZMine 2 format. Each column
        in an aligned boxfile either has properties related to the whole
        row (e.g. average m/z of the peak aligned on that row) or a property
        specific property of an unaligned peak from a parent .mzML. Row
        properties are parsed into a list of dictionaries (one dictionary
        per row) in the form [{property_name: value}, ...]. .mzML properties
        are loaded into a similar list but with a nested dictionary
        i.e. [{mzml_name: {property_name: value}}, ...].

        Args:
            box_file_path: Full path to the aligned boxfile.

        Returns: Tuple of .mzML names and iterable of pairs of row dicts
            and .mzML dicts.
        """

        row_headers = ["row ID", "row m/z", "row retention time"]

        with open(box_file_path, "r") as f:
            headers = f.readline().split(",")
            row_indices, mzml_indices = {}, defaultdict(dict)

            pattern = re.compile(r"(.*)\.mzML filtered Peak ([a-zA-Z/]+( [a-zA-Z/]+)*)")
            for i, h in enumerate(headers):
                if h in row_headers:
                    row_indices[h] = i
                else:
                    m = pattern.match(h)
                    if m is not None:
                        mzml_indices[m.group(1)][m.group(2)] = i

            fullscan_names = mzml_indices.keys()
            row_ls, mzml_ls = [], []
            for ln in f:
                split = ln.split(",")
                row_ls.append({k: split[i] for k, i in row_indices.items()})
                mzml_ls.append(
                    {
                        mzml: {k: split[i] for k, i in inner.items()}
                        for mzml, inner in mzml_indices.items()
                    }
                )

            return fullscan_names, zip(row_ls, mzml_ls)

check_files_match(fullscan_names, aligned_path, mode='subset') staticmethod

Check that the source files listed in the header of a peak-picking output match an input list.

Parameters:
  • fullscan_names

    List of .mzml files (or paths to them) to look for in the header of the aligned file.

  • aligned_path

    Full filepath to the aligned file.

  • mode

    "subset" just checks if all fullscan_names can be found in the header. "exact" checks whether or not the two sets of names exactly match.

Tuple of boolean reporting whether test succeeded, the
  • names of the fullscans given as input, and the names of files

  • found in the header.

Source code in vimms/PeakPicking.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@staticmethod
def check_files_match(fullscan_names, aligned_path, mode="subset"):
    """
    Check that the source files listed in the header of a peak-picking
    output match an input list.

    Args:
        fullscan_names: List of .mzml files (or paths to them) to look
            for in the header of the aligned file.
        aligned_path: Full filepath to the aligned file.
        mode: "subset" just checks if all fullscan_names can be found in
            the header. "exact" checks whether or not the two sets of
            names exactly match.

    Returns: Tuple of boolean reporting whether test succeeded, the
        names of the fullscans given as input, and the names of files
        found in the header.
    """
    fs_names = {os.path.basename(fs) for fs in fullscan_names}
    mzmine_names = set()

    with open(aligned_path, "r") as f:
        headers = f.readline().split(",")
        pattern = re.compile(r"(.*\.mzML).*")

        for h in headers:
            for fs in fs_names:
                m = pattern.match(h)
                if m is not None:
                    mzmine_names.add(m.group(1))

    mode = mode.lower()
    if mode == "exact":
        passed = not fs_names ^ mzmine_names
    elif mode == "subset":
        passed = not fs_names - mzmine_names
    else:
        raise ValueError("Mode not recognised")

    return passed, fs_names, mzmine_names

pick_aligned_peaks(input_files, output_dir, output_name, force=False)

Run MZMine batch mode file for a list of input files.

Parameters:
  • input_files

    Iterable of paths to input files.

  • output_dir

    Directory to write output to.

  • output_name

    Name for output file. Some text and the file extension are added automatically.

  • force

    When False, don't run peak-picking if a file already exists at the output destination.

Returns: Full path the output file was written to.

Source code in vimms/PeakPicking.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def pick_aligned_peaks(self, input_files, output_dir, output_name, force=False):
    """
    Run MZMine batch mode file for a list of input files.

    Args:
        input_files: Iterable of paths to input files.
        output_dir: Directory to write output to.
        output_name: Name for output file. Some text and the file extension
            are added automatically.
        force: When False, don't run peak-picking if a file already exists
            at the output destination.

    Returns: Full path the output file was written to.
    """
    input_files = list(set(input_files))  # filter duplicates
    output_path = self.format_output_path(output_dir, output_name)

    if not os.path.exists(output_path) or force:
        new_xml = self._make_batch_file(input_files, output_dir, output_name, output_path)
        print(f"Running MZMine for {output_path}")
        subprocess.run([self.mzmine_exe, new_xml])

    report_boxes("MZMine", output_path)
    return output_path

read_aligned_csv(box_file_path) staticmethod

Parse in an aligned boxfile in MZMine 2 format. Each column in an aligned boxfile either has properties related to the whole row (e.g. average m/z of the peak aligned on that row) or a property specific property of an unaligned peak from a parent .mzML. Row properties are parsed into a list of dictionaries (one dictionary per row) in the form [{property_name: value}, ...]. .mzML properties are loaded into a similar list but with a nested dictionary i.e. [{mzml_name: {property_name: value}}, ...].

Parameters:
  • box_file_path

    Full path to the aligned boxfile.

Tuple of .mzML names and iterable of pairs of row dicts
  • and .mzML dicts.

Source code in vimms/PeakPicking.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@staticmethod
def read_aligned_csv(box_file_path):
    """
    Parse in an aligned boxfile in MZMine 2 format. Each column
    in an aligned boxfile either has properties related to the whole
    row (e.g. average m/z of the peak aligned on that row) or a property
    specific property of an unaligned peak from a parent .mzML. Row
    properties are parsed into a list of dictionaries (one dictionary
    per row) in the form [{property_name: value}, ...]. .mzML properties
    are loaded into a similar list but with a nested dictionary
    i.e. [{mzml_name: {property_name: value}}, ...].

    Args:
        box_file_path: Full path to the aligned boxfile.

    Returns: Tuple of .mzML names and iterable of pairs of row dicts
        and .mzML dicts.
    """

    row_headers = ["row ID", "row m/z", "row retention time"]

    with open(box_file_path, "r") as f:
        headers = f.readline().split(",")
        row_indices, mzml_indices = {}, defaultdict(dict)

        pattern = re.compile(r"(.*)\.mzML filtered Peak ([a-zA-Z/]+( [a-zA-Z/]+)*)")
        for i, h in enumerate(headers):
            if h in row_headers:
                row_indices[h] = i
            else:
                m = pattern.match(h)
                if m is not None:
                    mzml_indices[m.group(1)][m.group(2)] = i

        fullscan_names = mzml_indices.keys()
        row_ls, mzml_ls = [], []
        for ln in f:
            split = ln.split(",")
            row_ls.append({k: split[i] for k, i in row_indices.items()})
            mzml_ls.append(
                {
                    mzml: {k: split[i] for k, i in inner.items()}
                    for mzml, inner in mzml_indices.items()
                }
            )

        return fullscan_names, zip(row_ls, mzml_ls)

MatchingController

Bases: FixedScansController

A pre-scheduled controller that executes a scan queue planned by a maximum bipartite matching from [vimms.Matching][].

Source code in vimms/Controller/misc.py
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
class MatchingController(FixedScansController):
    """
    A pre-scheduled controller that executes a scan queue planned by
    a maximum bipartite matching from [vimms.Matching][].
    """

    @classmethod
    def from_matching(cls, matching, isolation_width, advanced_params=None, task_filter=None):
        """
        Construct a list of MatchingControllers (one for each injection)
        from a potentially multi-injection matching.

        Args:
            matching: Instance of [vimms.Matching.Matching][].
            isolation_width: Isolation width in Daltons.
            advanced_params: Instance of [vimms.Controller.base.AdvancedParams][].
            task_filter: Object that dynamically returns tasks on request. For
                example [vimms.Controller.misc.TaskFilter][] can be used to
                resynchronise unexpected RTs with the expected ones in the schedule.
        """
        return [
            MatchingController(
                schedule=schedule,
                advanced_params=advanced_params,
                expected_rts=rts,
                task_filter=task_filter,
            )
            for schedule, rts in zip(*matching.make_schedules(isolation_width))
        ]

from_matching(matching, isolation_width, advanced_params=None, task_filter=None) classmethod

Construct a list of MatchingControllers (one for each injection) from a potentially multi-injection matching.

Parameters:
  • matching

    Instance of [vimms.Matching.Matching][].

  • isolation_width

    Isolation width in Daltons.

  • advanced_params
  • task_filter

    Object that dynamically returns tasks on request. For example vimms.Controller.misc.TaskFilter can be used to resynchronise unexpected RTs with the expected ones in the schedule.

Source code in vimms/Controller/misc.py
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
@classmethod
def from_matching(cls, matching, isolation_width, advanced_params=None, task_filter=None):
    """
    Construct a list of MatchingControllers (one for each injection)
    from a potentially multi-injection matching.

    Args:
        matching: Instance of [vimms.Matching.Matching][].
        isolation_width: Isolation width in Daltons.
        advanced_params: Instance of [vimms.Controller.base.AdvancedParams][].
        task_filter: Object that dynamically returns tasks on request. For
            example [vimms.Controller.misc.TaskFilter][] can be used to
            resynchronise unexpected RTs with the expected ones in the schedule.
    """
    return [
        MatchingController(
            schedule=schedule,
            advanced_params=advanced_params,
            expected_rts=rts,
            task_filter=task_filter,
        )
        for schedule, rts in zip(*matching.make_schedules(isolation_width))
    ]

MinIntensityFilter

Bases: ScoreFilter

A class that implements minimum intensity filter

Source code in vimms/Exclusion.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
class MinIntensityFilter(ScoreFilter):
    """
    A class that implements minimum intensity filter
    """

    def __init__(self, min_ms1_intensity):
        """
        Initialises the minimum intensity filter
        Args:
            min_ms1_intensity: the minimum intensity to check
        """
        self.min_ms1_intensity = min_ms1_intensity

    def filter(self, intensities):
        """
        Check whether intensity values are above or below the threshold
        Args:
            intensities: an array of intensity values

        Returns: an array of indicators for the filter

        """
        return np.array(intensities) > self.min_ms1_intensity

__init__(min_ms1_intensity)

Initialises the minimum intensity filter Args: min_ms1_intensity: the minimum intensity to check

Source code in vimms/Exclusion.py
467
468
469
470
471
472
473
def __init__(self, min_ms1_intensity):
    """
    Initialises the minimum intensity filter
    Args:
        min_ms1_intensity: the minimum intensity to check
    """
    self.min_ms1_intensity = min_ms1_intensity

filter(intensities)

Check whether intensity values are above or below the threshold Args: intensities: an array of intensity values

Returns: an array of indicators for the filter

Source code in vimms/Exclusion.py
475
476
477
478
479
480
481
482
483
484
def filter(self, intensities):
    """
    Check whether intensity values are above or below the threshold
    Args:
        intensities: an array of intensity values

    Returns: an array of indicators for the filter

    """
    return np.array(intensities) > self.min_ms1_intensity

MultiIsolationController

Bases: Controller

A controller used to test multiple isolations in a single MS2 scan.

Source code in vimms/Controller/misc.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
class MultiIsolationController(Controller):
    """
    A controller used to test multiple isolations in a single MS2 scan.
    """

    def __init__(self, N, isolation_width=DEFAULT_ISOLATION_WIDTH, advanced_params=None):
        """
        Initialise a multi-isolation controller
        Args:
            N: the number of precursor ions to fragment
            isolation_width: isolation width, in Dalton
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
        """
        super().__init__(advanced_params=advanced_params)
        assert N > 1
        self.N = N
        self.isolation_width = isolation_width
        self.mz_tol = 10
        self.rt_tol = 15

    def _make_scan_order(self, N):
        # makes a list of tuples, each saying which precuror idx in the sorted
        # list should be in which MS2 scan
        initial_idx = range(N)
        scan_order = []
        for L in range(1, len(initial_idx) + 1):
            for subset in itertools.combinations(initial_idx, L):
                scan_order.append(subset)
        return scan_order

    def _process_scan(self, scan):
        # if there's a previous ms1 scan to process
        new_tasks = []
        # fragmented_count = 0
        if self.scan_to_process is not None:
            mzs = self.scan_to_process.mzs
            intensities = self.scan_to_process.intensities
            # rt = self.scan_to_process.rt
            idx = np.argsort(intensities)[::-1]
            precursor_scan_id = self.scan_to_process.scan_id
            scan_order = self._make_scan_order(min(self.N, len(mzs)))

            for subset in scan_order:
                mz = []
                intensity = []
                for s in subset:
                    mz.append(mzs[idx[s]])
                    intensity.append(mzs[idx[s]])
                dda_scan_params = self.get_ms2_scan_params(
                    mz,
                    intensity,
                    precursor_scan_id,
                    self.isolation_width,
                    self.mz_tol,
                    self.rt_tol,
                )

                new_tasks.append(dda_scan_params)
                self.current_task_id += 1

            ms1_scan_params = self.get_ms1_scan_params()
            self.current_task_id += 1
            self.next_processed_scan_id = self.current_task_id
            new_tasks.append(ms1_scan_params)

        return new_tasks

    def update_state_after_scan(self, scan):
        pass

__init__(N, isolation_width=DEFAULT_ISOLATION_WIDTH, advanced_params=None)

Initialise a multi-isolation controller Args: N: the number of precursor ions to fragment isolation_width: isolation width, in Dalton advanced_params: an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

Source code in vimms/Controller/misc.py
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
def __init__(self, N, isolation_width=DEFAULT_ISOLATION_WIDTH, advanced_params=None):
    """
    Initialise a multi-isolation controller
    Args:
        N: the number of precursor ions to fragment
        isolation_width: isolation width, in Dalton
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
    """
    super().__init__(advanced_params=advanced_params)
    assert N > 1
    self.N = N
    self.isolation_width = isolation_width
    self.mz_tol = 10
    self.rt_tol = 15

NonOverlapController

Bases: TopNEXtController

A controller that implements the non-overlapping idea to determine how regions-of-interests should be fragmented across injections.

Source code in vimms/Controller/box.py
203
204
205
206
207
208
209
210
211
class NonOverlapController(TopNEXtController):
    """
    A controller that implements the `non-overlapping` idea to determine how regions-of-interests
    should be fragmented across injections.
    """

    def _overlap_scores(self):
        weights = np.array([self.grid.non_overlap(r) for r in self.roi_builder.live_roi])
        return weights

ReTopNController

Bases: TopNEXtController

Reimplementation of the topN controller in the topNEXt framework, allowing it to use features like inclusion boxes.

Source code in vimms/Controller/box.py
153
154
155
156
157
158
159
160
class ReTopNController(TopNEXtController):
    """
    Reimplementation of the topN controller in the topNEXt framework,
    allowing it to use features like inclusion boxes.
    """

    def _overlap_scores(self):
        return np.array([1 for r in self.roi_builder.live_roi])

RoiBuilder

A class to construct ROIs. This can be used in real-time to track ROIs in a controller, or for extracting ROIs from an mzML file.

Source code in vimms/Roi.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
class RoiBuilder:
    """
    A class to construct ROIs. This can be used in real-time to track ROIs
    in a controller, or for extracting ROIs from an mzML file.
    """

    def __init__(self, roi_params, smartroi_params=None):
        """
        Initialises an ROI Builder object.

        Args:
            roi_params: parameters for ROI building, as defined in [vimms.Roi.RoiBuilderParams][].
            smartroi_params: other SmartROI parameters, as defined in
                             [vimms.Roi.SmartRoiParams][].
            grid: a grid object, if available
        """
        self.roi_params = roi_params
        self.smartroi_params = smartroi_params

        self.roi_type = ROI_TYPE_NORMAL
        if self.smartroi_params is not None:
            self.roi_type = ROI_TYPE_SMART
        assert self.roi_type in [ROI_TYPE_NORMAL, ROI_TYPE_SMART]

        # Create ROI
        self.live_roi = []
        self.dead_roi = []
        self.junk_roi = []

        # fragmentation to Roi dictionaries
        self.frag_roi_dicts = []  # scan_id, roi_id, precursor_intensity
        self.roi_id_counter = 0

        # count how many times an ROI is not grown
        self.skipped_roi_count = defaultdict(int)

    # flake8: noqa: C901
    def update_roi(self, new_scan):
        """
        Updates ROI in real-time based on incoming scans

        Args:
            new_scan: a newly arriving Scan object

        Returns: None

        """
        if new_scan.ms_level == 1:

            # Sort ROIs in live_roi according to their m/z values.
            # Ensure that the live roi fragmented flags and the last RT
            # are also consistent with the sorting order.
            self.live_roi.sort()

            # Current scan retention time of the MS1 scan is the RT of all
            # points in this scan
            current_rt = new_scan.rt

            # check that there's no ROI with skip_count > self.max_skips_allowed
            skip_counts = np.array(list(self.skipped_roi_count.values()))
            assert np.sum(skip_counts > self.roi_params.max_gaps_allowed) == 0

            # The set of ROIs that are not grown yet.
            # Initially all currently live ROIs are included, and they're
            # removed once grown.
            not_grew = set(self.live_roi)

            # For every (mz, intensity) in scan ..
            for idx in range(len(new_scan.intensities)):
                current_intensity = new_scan.intensities[idx]
                current_mz = new_scan.mzs[idx]

                if current_intensity >= self.roi_params.min_roi_intensity:

                    # Create a dummy ROI object to represent the current m/z
                    # value. This produces either a normal ROI or smart ROI
                    # object, depending on self.roi_type
                    roi = self._get_roi_obj(current_mz, 0, 0, None)

                    # Match dummy ROI to currently live ROIs based on mean
                    # m/z values. If no match, then return None
                    match_roi = self._match(roi, self.live_roi, self.roi_params.mz_tol)
                    if match_roi:

                        # Got a match, so we grow this ROI
                        match_roi.add(current_mz, current_rt, current_intensity)

                        # ROI has been matched and can be removed from not_grew
                        if match_roi in not_grew:
                            not_grew.remove(match_roi)

                            # this ROI has been grown so delete from skip count if possible
                            try:
                                del self.skipped_roi_count[match_roi]
                            except KeyError:
                                pass

                    else:

                        # No match, so create a new ROI and insert it in the
                        # right place in the sorted list
                        new_roi = self._get_roi_obj(
                            current_mz, current_rt, current_intensity, self.roi_id_counter
                        )
                        self.roi_id_counter += 1
                        bisect.insort_right(self.live_roi, new_roi)

            # Separate the ROIs that have not been grown into dead or junk ROIs
            # Dead ROIs are longer than self.min_roi_length but they haven't
            # been grown. Junk ROIs are too short and not grown.
            for roi in not_grew:

                # if too much gaps ...
                self.skipped_roi_count[roi] += 1
                if self.skipped_roi_count[roi] > self.roi_params.max_gaps_allowed:

                    # then set the roi to either dead or junk (depending on length)
                    if roi.get_num_unique_scans() >= self.roi_params.min_roi_length:
                        self.dead_roi.append(roi)
                    else:
                        self.junk_roi.append(roi)

                    # Remove not-grown ROI from the list of live ROIs
                    pos = self.live_roi.index(roi)
                    del self.live_roi[pos]

                    # this ROI is either dead or junk, so delete from skip count
                    # as we don't need to track it anymore
                    del self.skipped_roi_count[roi]

            self.current_roi_ids = [roi.id for roi in self.live_roi]
            self.current_roi_mzs = [roi.mz_list[-1] for roi in self.live_roi]
            self.current_roi_intensities = [roi.intensity_list[-1] for roi in self.live_roi]
            self.current_roi_length = np.array(
                [roi.get_num_unique_scans() for roi in self.live_roi]
            )

    # flake8: noqa: C901
    def _match(self, mz, roi_list, mz_tol):
        """
        Find the RoI that a particular mz falls into. If it falls into nothing,
        return None.

        Args:
            mz: an ROI object containing the m/z we want to find
            roi_list: the list of other ROIs to determine where to place the
                      mz ROI into
            mz_tol: m/z tolerance. This is the window above and below the
                    mean_mz of the RoI.
                    E.g. if mz_tol = 10 ppm, then it looks plus and minus 10 ppm

        Returns: the ROI that has been matched, or None if not found.

        """
        if len(roi_list) == 0:
            return None
        pos = bisect.bisect_right(roi_list, mz)

        if pos == len(roi_list):
            dist_left = 1e6 * (mz.mean_mz - roi_list[pos - 1].mean_mz) / mz.mean_mz

            if dist_left < mz_tol:
                return roi_list[pos - 1]
            else:
                return None

        elif pos == 0:
            dist_right = 1e6 * (roi_list[pos].mean_mz - mz.mean_mz) / mz.mean_mz

            if dist_right < mz_tol:
                return roi_list[pos]
            else:
                return None

        else:
            dist_left = 1e6 * (mz.mean_mz - roi_list[pos - 1].mean_mz) / mz.mean_mz
            dist_right = 1e6 * (roi_list[pos].mean_mz - mz.mean_mz) / mz.mean_mz

            if dist_left < mz_tol < dist_right:
                return roi_list[pos - 1]
            elif dist_left > mz_tol > dist_right:
                return roi_list[pos]
            elif dist_left < mz_tol and dist_right < mz_tol:
                if dist_left <= dist_right:
                    return roi_list[pos - 1]
                else:
                    return roi_list[pos]
            else:
                return None

    def _get_roi_obj(self, mz, rt, intensity, roi_id):
        """
        Constructs a new ROI object based on the currently defined type in
        this builder

        Args:
            mz: the m/z value
            rt: the RT value
            intensity: the intensity value
            roi_id: the ROI id

        Returns: a new ROI object

        """
        if self.roi_type == ROI_TYPE_NORMAL:
            roi = Roi(mz, rt, intensity, id=roi_id)
        elif self.roi_type == ROI_TYPE_SMART:
            assert self.smartroi_params is not None
            roi = SmartRoi(mz, rt, intensity, self.smartroi_params, id=roi_id)
        return roi

    def get_mz_intensity(self, i):
        """
        Returns the (m/z, intensity, ROI ID) value of point at position i in
        this ROI

        Args:
            i: the index of point to return

        Returns: a tuple of (mz, intensity, roi ID)

        """
        mz = self.current_roi_mzs[i]
        intensity = self.current_roi_intensities[i]
        roi_id = self.current_roi_ids[i]
        return mz, intensity, roi_id

    def set_fragmented(self, current_task_id, i, roi_id, rt, intensity):
        """
        Updates this ROI to indicate that it has been fragmented

        Args:
            current_task_id:  the current task ID
            i: index of fragmented ROI in the live ROI list
            roi_id: the ID of ROI
            rt: time of fragmentation
            intensity: intensity at fragmentation

        Returns: None

        """
        self.live_roi[i].fragmented(rt)

        # Add information on which scan has fragmented this ROI
        self.frag_roi_dicts.append(
            {"scan_id": current_task_id, "roi_id": roi_id, "precursor_intensity": intensity}
        )

        # need to track for intensity non-overlap
        self.live_roi[i].max_fragmentation_intensity = max(
            self.live_roi[i].max_fragmentation_intensity, intensity
        )

    def add_scan_to_roi(self, scan):
        """
        Stores the information on which scans and frag events are associated
        to this ROI
        """
        frag_event_ids = np.array([event["scan_id"] for event in self.frag_roi_dicts])
        which_event = np.where(frag_event_ids == scan.scan_id)[0]
        live_roi_ids = np.array([roi.id for roi in self.live_roi])
        which_roi = np.where(live_roi_ids == self.frag_roi_dicts[which_event[0]]["roi_id"])[0]
        if len(which_roi) > 0:
            self.live_roi[which_roi[0]].add_fragmentation_event(
                scan, self.frag_roi_dicts[which_event[0]]["precursor_intensity"]
            )
            del self.frag_roi_dicts[which_event[0]]
        else:
            pass  # hopefully shouldnt happen

    def get_rois(self):
        """
        Returns all ROIs
        """
        return self.live_roi + self.dead_roi

    def get_good_rois(self):
        """
        Returns all ROIs above filtering criteria
        """
        # length check
        filtered_roi = [
            roi
            for roi in self.live_roi
            if roi.get_num_unique_scans() >= self.roi_params.min_roi_length
        ]

        # intensity check:
        # Keep only the ROIs that can be fragmented above
        # at_least_one_point_above threshold.
        all_roi = filtered_roi + self.dead_roi
        if self.roi_params.at_least_one_point_above > 0:
            keep = []
            for roi in all_roi:
                if any(it > self.roi_params.at_least_one_point_above for it in roi.intensity_list):
                    keep.append(roi)
        else:
            keep = all_roi
        return keep

__init__(roi_params, smartroi_params=None)

Initialises an ROI Builder object.

Parameters:
Source code in vimms/Roi.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def __init__(self, roi_params, smartroi_params=None):
    """
    Initialises an ROI Builder object.

    Args:
        roi_params: parameters for ROI building, as defined in [vimms.Roi.RoiBuilderParams][].
        smartroi_params: other SmartROI parameters, as defined in
                         [vimms.Roi.SmartRoiParams][].
        grid: a grid object, if available
    """
    self.roi_params = roi_params
    self.smartroi_params = smartroi_params

    self.roi_type = ROI_TYPE_NORMAL
    if self.smartroi_params is not None:
        self.roi_type = ROI_TYPE_SMART
    assert self.roi_type in [ROI_TYPE_NORMAL, ROI_TYPE_SMART]

    # Create ROI
    self.live_roi = []
    self.dead_roi = []
    self.junk_roi = []

    # fragmentation to Roi dictionaries
    self.frag_roi_dicts = []  # scan_id, roi_id, precursor_intensity
    self.roi_id_counter = 0

    # count how many times an ROI is not grown
    self.skipped_roi_count = defaultdict(int)

add_scan_to_roi(scan)

Stores the information on which scans and frag events are associated to this ROI

Source code in vimms/Roi.py
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
def add_scan_to_roi(self, scan):
    """
    Stores the information on which scans and frag events are associated
    to this ROI
    """
    frag_event_ids = np.array([event["scan_id"] for event in self.frag_roi_dicts])
    which_event = np.where(frag_event_ids == scan.scan_id)[0]
    live_roi_ids = np.array([roi.id for roi in self.live_roi])
    which_roi = np.where(live_roi_ids == self.frag_roi_dicts[which_event[0]]["roi_id"])[0]
    if len(which_roi) > 0:
        self.live_roi[which_roi[0]].add_fragmentation_event(
            scan, self.frag_roi_dicts[which_event[0]]["precursor_intensity"]
        )
        del self.frag_roi_dicts[which_event[0]]
    else:
        pass  # hopefully shouldnt happen

get_good_rois()

Returns all ROIs above filtering criteria

Source code in vimms/Roi.py
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
def get_good_rois(self):
    """
    Returns all ROIs above filtering criteria
    """
    # length check
    filtered_roi = [
        roi
        for roi in self.live_roi
        if roi.get_num_unique_scans() >= self.roi_params.min_roi_length
    ]

    # intensity check:
    # Keep only the ROIs that can be fragmented above
    # at_least_one_point_above threshold.
    all_roi = filtered_roi + self.dead_roi
    if self.roi_params.at_least_one_point_above > 0:
        keep = []
        for roi in all_roi:
            if any(it > self.roi_params.at_least_one_point_above for it in roi.intensity_list):
                keep.append(roi)
    else:
        keep = all_roi
    return keep

get_mz_intensity(i)

Returns the (m/z, intensity, ROI ID) value of point at position i in this ROI

Parameters:
  • i

    the index of point to return

Returns: a tuple of (mz, intensity, roi ID)

Source code in vimms/Roi.py
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
def get_mz_intensity(self, i):
    """
    Returns the (m/z, intensity, ROI ID) value of point at position i in
    this ROI

    Args:
        i: the index of point to return

    Returns: a tuple of (mz, intensity, roi ID)

    """
    mz = self.current_roi_mzs[i]
    intensity = self.current_roi_intensities[i]
    roi_id = self.current_roi_ids[i]
    return mz, intensity, roi_id

get_rois()

Returns all ROIs

Source code in vimms/Roi.py
797
798
799
800
801
def get_rois(self):
    """
    Returns all ROIs
    """
    return self.live_roi + self.dead_roi

set_fragmented(current_task_id, i, roi_id, rt, intensity)

Updates this ROI to indicate that it has been fragmented

Parameters:
  • current_task_id

    the current task ID

  • i

    index of fragmented ROI in the live ROI list

  • roi_id

    the ID of ROI

  • rt

    time of fragmentation

  • intensity

    intensity at fragmentation

Returns: None

Source code in vimms/Roi.py
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
def set_fragmented(self, current_task_id, i, roi_id, rt, intensity):
    """
    Updates this ROI to indicate that it has been fragmented

    Args:
        current_task_id:  the current task ID
        i: index of fragmented ROI in the live ROI list
        roi_id: the ID of ROI
        rt: time of fragmentation
        intensity: intensity at fragmentation

    Returns: None

    """
    self.live_roi[i].fragmented(rt)

    # Add information on which scan has fragmented this ROI
    self.frag_roi_dicts.append(
        {"scan_id": current_task_id, "roi_id": roi_id, "precursor_intensity": intensity}
    )

    # need to track for intensity non-overlap
    self.live_roi[i].max_fragmentation_intensity = max(
        self.live_roi[i].max_fragmentation_intensity, intensity
    )

update_roi(new_scan)

Updates ROI in real-time based on incoming scans

Parameters:
  • new_scan

    a newly arriving Scan object

Returns: None

Source code in vimms/Roi.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
def update_roi(self, new_scan):
    """
    Updates ROI in real-time based on incoming scans

    Args:
        new_scan: a newly arriving Scan object

    Returns: None

    """
    if new_scan.ms_level == 1:

        # Sort ROIs in live_roi according to their m/z values.
        # Ensure that the live roi fragmented flags and the last RT
        # are also consistent with the sorting order.
        self.live_roi.sort()

        # Current scan retention time of the MS1 scan is the RT of all
        # points in this scan
        current_rt = new_scan.rt

        # check that there's no ROI with skip_count > self.max_skips_allowed
        skip_counts = np.array(list(self.skipped_roi_count.values()))
        assert np.sum(skip_counts > self.roi_params.max_gaps_allowed) == 0

        # The set of ROIs that are not grown yet.
        # Initially all currently live ROIs are included, and they're
        # removed once grown.
        not_grew = set(self.live_roi)

        # For every (mz, intensity) in scan ..
        for idx in range(len(new_scan.intensities)):
            current_intensity = new_scan.intensities[idx]
            current_mz = new_scan.mzs[idx]

            if current_intensity >= self.roi_params.min_roi_intensity:

                # Create a dummy ROI object to represent the current m/z
                # value. This produces either a normal ROI or smart ROI
                # object, depending on self.roi_type
                roi = self._get_roi_obj(current_mz, 0, 0, None)

                # Match dummy ROI to currently live ROIs based on mean
                # m/z values. If no match, then return None
                match_roi = self._match(roi, self.live_roi, self.roi_params.mz_tol)
                if match_roi:

                    # Got a match, so we grow this ROI
                    match_roi.add(current_mz, current_rt, current_intensity)

                    # ROI has been matched and can be removed from not_grew
                    if match_roi in not_grew:
                        not_grew.remove(match_roi)

                        # this ROI has been grown so delete from skip count if possible
                        try:
                            del self.skipped_roi_count[match_roi]
                        except KeyError:
                            pass

                else:

                    # No match, so create a new ROI and insert it in the
                    # right place in the sorted list
                    new_roi = self._get_roi_obj(
                        current_mz, current_rt, current_intensity, self.roi_id_counter
                    )
                    self.roi_id_counter += 1
                    bisect.insort_right(self.live_roi, new_roi)

        # Separate the ROIs that have not been grown into dead or junk ROIs
        # Dead ROIs are longer than self.min_roi_length but they haven't
        # been grown. Junk ROIs are too short and not grown.
        for roi in not_grew:

            # if too much gaps ...
            self.skipped_roi_count[roi] += 1
            if self.skipped_roi_count[roi] > self.roi_params.max_gaps_allowed:

                # then set the roi to either dead or junk (depending on length)
                if roi.get_num_unique_scans() >= self.roi_params.min_roi_length:
                    self.dead_roi.append(roi)
                else:
                    self.junk_roi.append(roi)

                # Remove not-grown ROI from the list of live ROIs
                pos = self.live_roi.index(roi)
                del self.live_roi[pos]

                # this ROI is either dead or junk, so delete from skip count
                # as we don't need to track it anymore
                del self.skipped_roi_count[roi]

        self.current_roi_ids = [roi.id for roi in self.live_roi]
        self.current_roi_mzs = [roi.mz_list[-1] for roi in self.live_roi]
        self.current_roi_intensities = [roi.intensity_list[-1] for roi in self.live_roi]
        self.current_roi_length = np.array(
            [roi.get_num_unique_scans() for roi in self.live_roi]
        )

RoiController

Bases: TopNController

An ROI based controller with multiple options

Source code in vimms/Controller/roi.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class RoiController(TopNController):
    """
    An ROI based controller with multiple options
    """

    def __init__(
        self,
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        smartroi_params=None,
        min_roi_length_for_fragmentation=0,
        ms1_shift=0,
        advanced_params=None,
        exclusion_method=ROI_EXCLUSION_DEW,
        exclusion_t_0=None,
    ):
        """
        Initialise an ROI-based controller
        Args:
            ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
            isolation_width: isolation width in Dalton
            N: the number of highest-score precursor ions to fragment
            mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
            rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
            min_ms1_intensity: the minimum intensity to fragment a precursor ion
            roi_params: an instance of [vimms.Roi.RoiBuilderParams][] that describes
                        how to build ROIs in real time based on incoming scans.
            smartroi_params: an instance of [vimms.Roi.SmartRoiParams][]. If provided, then
                             the SmartROI rules (as described in the paper) will be used to select
                             which ROI to fragment. Otherwise set to None to use standard ROIs.
            min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented.
            ms1_shift: advanced parameter -- best to leave it.
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
            exclusion_method: an instance of [vimms.Exclusion.TopNExclusion][] or its subclasses,
                              used to describe how to perform dynamic exclusion so that precursors
                              that have been fragmented are not fragmented again.
            exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).
        """
        super().__init__(
            ionisation_mode,
            N,
            isolation_width,
            mz_tol,
            rt_tol,
            min_ms1_intensity,
            ms1_shift=ms1_shift,
            advanced_params=advanced_params,
        )
        self.min_roi_length_for_fragmentation = min_roi_length_for_fragmentation  # noqa
        self.roi_builder = RoiBuilder(roi_params, smartroi_params=smartroi_params)

        self.exclusion_method = exclusion_method
        assert self.exclusion_method in [ROI_EXCLUSION_DEW, ROI_EXCLUSION_WEIGHTED_DEW]
        if self.exclusion_method == ROI_EXCLUSION_WEIGHTED_DEW:
            assert exclusion_t_0 is not None, "Must be a number"
            assert exclusion_t_0 < rt_tol, "Impossible combination"
            self.exclusion = WeightedDEWExclusion(mz_tol, rt_tol, exclusion_t_0)

        self.exclusion_t_0 = exclusion_t_0

    def schedule_ms1(self, new_tasks):
        """
        Schedule a new MS1 scan by creating a new default MS1 [vimms.Common.ScanParameters][].
        Args:
            new_tasks: the list of new tasks in the environment

        Returns: None

        """
        ms1_scan_params = self.get_ms1_scan_params()
        self.current_task_id += 1
        self.next_processed_scan_id = self.current_task_id
        new_tasks.append(ms1_scan_params)

    class MS2Scheduler:
        """
        A class that performs MS2 scheduling of tasks
        """

        def __init__(self, parent):
            """
            Initialises an MS2 scheduler
            Args:
                parent: the parent controller class
            """
            self.parent = parent
            self.fragmented_count = 0

        def schedule_ms2s(self, new_tasks, ms2_tasks, mz, intensity):
            """
            Schedule a new MS2 scan by creating a new default MS2 [vimms.Common.ScanParameters][].
            Args:
                new_tasks: the list of new tasks in the environment
                ms2_tasks: the list of MS2 tasks in the environment
                mz: the precursor m/z to fragment
                intensity: the precusor intensity to fragment

            Returns: None

            """
            precursor_scan_id = self.parent.scan_to_process.scan_id
            dda_scan_params = self.parent.get_ms2_scan_params(
                mz,
                intensity,
                precursor_scan_id,
                self.parent.isolation_width,
                self.parent.mz_tol,
                self.parent.rt_tol,
            )
            new_tasks.append(dda_scan_params)
            ms2_tasks.append(dda_scan_params)
            self.parent.current_task_id += 1
            self.fragmented_count += 1

    def _set_fragmented(self, i, roi_id, rt, intensity):
        self.roi_builder.set_fragmented(self.current_task_id, i, roi_id, rt, intensity)

    def _process_scan(self, scan):
        if self.scan_to_process is not None:
            assert self.scan_to_process == scan

            # keep growing ROIs if we encounter a new ms1 scan
            self.roi_builder.update_roi(scan)
            new_tasks, ms2_tasks = [], []
            rt = self.scan_to_process.rt

            done_ms1, ms2s, scores = False, self.MS2Scheduler(self), self._get_scores()
            for i in np.argsort(scores)[::-1]:
                # stopping criteria is done based on the scores
                if scores[i] <= 0:
                    break

                mz, intensity, roi_id = self.roi_builder.get_mz_intensity(i)
                ms2s.schedule_ms2s(new_tasks, ms2_tasks, mz, intensity)
                self._set_fragmented(i, roi_id, rt, intensity)

                if ms2s.fragmented_count == self.N - self.ms1_shift:
                    self.schedule_ms1(new_tasks)
                    done_ms1 = True

            # if no ms1 has been added, then add at the end
            if not done_ms1:
                self.schedule_ms1(new_tasks)

            # create new exclusion items based on the scheduled ms2 tasks
            if self.exclusion is not None:
                self.exclusion.update(self.scan_to_process, ms2_tasks)

            # set this ms1 scan as has been processed
            self.scan_to_process = None
            return new_tasks

        elif scan.ms_level == 2:  # add ms2 scans to Rois
            self.roi_builder.add_scan_to_roi(scan)
            return []

    ###########################################################################
    # Scoring functions
    ###########################################################################

    def _log_roi_intensities(self):
        """
        Scores ROI by their log intensity values

        Returns: a numpy array of log intensity scores

        """
        return np.log(self.roi_builder.current_roi_intensities)

    def _min_intensity_filter(self):
        """
        Filter ROIs by minimum intensity threshold

        Returns: indicators whether ROIs pass the check

        """
        f = MinIntensityFilter(self.min_ms1_intensity)
        return f.filter(self.roi_builder.current_roi_intensities)

    def _time_filter(self):
        """
        Filter ROIs by dynamic exclusion

        Returns: indicators whether ROIs pass the check

        """
        if self.exclusion_method == ROI_EXCLUSION_DEW:
            f = DEWFilter(self.rt_tol)
            return f.filter(self.scan_to_process.rt, self.roi_builder.live_roi)

        elif self.exclusion_method == ROI_EXCLUSION_WEIGHTED_DEW:
            f = WeightedDEWFilter(self.exclusion)
            return f.filter(self.scan_to_process.rt, self.roi_builder.live_roi)

    def _length_filter(self):
        """
        Filter ROI based on their length (>= min_roi_length_for_fragmentation)

        Returns: indicators whether ROIs pass the check

        """
        f = LengthFilter(self.min_roi_length_for_fragmentation)
        return f.filter(self.roi_builder.current_roi_length)

    def _smartroi_filter(self):
        """
        Filter ROI based on the SmartROI rules

        Returns: indicators whether ROIs pass the check

        """
        f = SmartROIFilter()
        return f.filter(self.roi_builder.live_roi)

    def _score_filters(self):
        """
        Combine various filtering criteria

        Returns: the combined scoring criteria

        """
        return self._min_intensity_filter() * self._time_filter() * self._length_filter()

    def _get_dda_scores(self):
        """
        Compute DDA scores = log roi intensities * the different scoring criteria
        Returns:

        """
        return self._log_roi_intensities() * self._score_filters()

    def _get_top_N_scores(self, scores):
        """
        Select the topN highest scores and set the rest to 0.

        Args:
            scores: a numpy array of scores

        Returns: same scores, but keeping only the top-N largest values.

        """
        if len(scores) > self.N:  # number of fragmentation events filter
            scores[scores.argsort()[: (len(scores) - self.N)]] = 0
        return scores

    def _get_scores(self):
        """
        Computes scores used to rank precursor ions to fragment

        Returns: an array of scores. Highest scoring ions should be selected first.

        """
        NotImplementedError()

MS2Scheduler

A class that performs MS2 scheduling of tasks

Source code in vimms/Controller/roi.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class MS2Scheduler:
    """
    A class that performs MS2 scheduling of tasks
    """

    def __init__(self, parent):
        """
        Initialises an MS2 scheduler
        Args:
            parent: the parent controller class
        """
        self.parent = parent
        self.fragmented_count = 0

    def schedule_ms2s(self, new_tasks, ms2_tasks, mz, intensity):
        """
        Schedule a new MS2 scan by creating a new default MS2 [vimms.Common.ScanParameters][].
        Args:
            new_tasks: the list of new tasks in the environment
            ms2_tasks: the list of MS2 tasks in the environment
            mz: the precursor m/z to fragment
            intensity: the precusor intensity to fragment

        Returns: None

        """
        precursor_scan_id = self.parent.scan_to_process.scan_id
        dda_scan_params = self.parent.get_ms2_scan_params(
            mz,
            intensity,
            precursor_scan_id,
            self.parent.isolation_width,
            self.parent.mz_tol,
            self.parent.rt_tol,
        )
        new_tasks.append(dda_scan_params)
        ms2_tasks.append(dda_scan_params)
        self.parent.current_task_id += 1
        self.fragmented_count += 1

__init__(parent)

Initialises an MS2 scheduler Args: parent: the parent controller class

Source code in vimms/Controller/roi.py
107
108
109
110
111
112
113
114
def __init__(self, parent):
    """
    Initialises an MS2 scheduler
    Args:
        parent: the parent controller class
    """
    self.parent = parent
    self.fragmented_count = 0

schedule_ms2s(new_tasks, ms2_tasks, mz, intensity)

Schedule a new MS2 scan by creating a new default MS2 vimms.Common.ScanParameters. Args: new_tasks: the list of new tasks in the environment ms2_tasks: the list of MS2 tasks in the environment mz: the precursor m/z to fragment intensity: the precusor intensity to fragment

Returns: None

Source code in vimms/Controller/roi.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def schedule_ms2s(self, new_tasks, ms2_tasks, mz, intensity):
    """
    Schedule a new MS2 scan by creating a new default MS2 [vimms.Common.ScanParameters][].
    Args:
        new_tasks: the list of new tasks in the environment
        ms2_tasks: the list of MS2 tasks in the environment
        mz: the precursor m/z to fragment
        intensity: the precusor intensity to fragment

    Returns: None

    """
    precursor_scan_id = self.parent.scan_to_process.scan_id
    dda_scan_params = self.parent.get_ms2_scan_params(
        mz,
        intensity,
        precursor_scan_id,
        self.parent.isolation_width,
        self.parent.mz_tol,
        self.parent.rt_tol,
    )
    new_tasks.append(dda_scan_params)
    ms2_tasks.append(dda_scan_params)
    self.parent.current_task_id += 1
    self.fragmented_count += 1

__init__(ionisation_mode, isolation_width, N, mz_tol, rt_tol, min_ms1_intensity, roi_params, smartroi_params=None, min_roi_length_for_fragmentation=0, ms1_shift=0, advanced_params=None, exclusion_method=ROI_EXCLUSION_DEW, exclusion_t_0=None)

Initialise an ROI-based controller Args: ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE isolation_width: isolation width in Dalton N: the number of highest-score precursor ions to fragment mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window min_ms1_intensity: the minimum intensity to fragment a precursor ion roi_params: an instance of vimms.Roi.RoiBuilderParams that describes how to build ROIs in real time based on incoming scans. smartroi_params: an instance of vimms.Roi.SmartRoiParams. If provided, then the SmartROI rules (as described in the paper) will be used to select which ROI to fragment. Otherwise set to None to use standard ROIs. min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented. ms1_shift: advanced parameter -- best to leave it. advanced_params: an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used. exclusion_method: an instance of vimms.Exclusion.TopNExclusion or its subclasses, used to describe how to perform dynamic exclusion so that precursors that have been fragmented are not fragmented again. exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).

Source code in vimms/Controller/roi.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(
    self,
    ionisation_mode,
    isolation_width,
    N,
    mz_tol,
    rt_tol,
    min_ms1_intensity,
    roi_params,
    smartroi_params=None,
    min_roi_length_for_fragmentation=0,
    ms1_shift=0,
    advanced_params=None,
    exclusion_method=ROI_EXCLUSION_DEW,
    exclusion_t_0=None,
):
    """
    Initialise an ROI-based controller
    Args:
        ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
        isolation_width: isolation width in Dalton
        N: the number of highest-score precursor ions to fragment
        mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
        rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
        min_ms1_intensity: the minimum intensity to fragment a precursor ion
        roi_params: an instance of [vimms.Roi.RoiBuilderParams][] that describes
                    how to build ROIs in real time based on incoming scans.
        smartroi_params: an instance of [vimms.Roi.SmartRoiParams][]. If provided, then
                         the SmartROI rules (as described in the paper) will be used to select
                         which ROI to fragment. Otherwise set to None to use standard ROIs.
        min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented.
        ms1_shift: advanced parameter -- best to leave it.
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
        exclusion_method: an instance of [vimms.Exclusion.TopNExclusion][] or its subclasses,
                          used to describe how to perform dynamic exclusion so that precursors
                          that have been fragmented are not fragmented again.
        exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).
    """
    super().__init__(
        ionisation_mode,
        N,
        isolation_width,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        ms1_shift=ms1_shift,
        advanced_params=advanced_params,
    )
    self.min_roi_length_for_fragmentation = min_roi_length_for_fragmentation  # noqa
    self.roi_builder = RoiBuilder(roi_params, smartroi_params=smartroi_params)

    self.exclusion_method = exclusion_method
    assert self.exclusion_method in [ROI_EXCLUSION_DEW, ROI_EXCLUSION_WEIGHTED_DEW]
    if self.exclusion_method == ROI_EXCLUSION_WEIGHTED_DEW:
        assert exclusion_t_0 is not None, "Must be a number"
        assert exclusion_t_0 < rt_tol, "Impossible combination"
        self.exclusion = WeightedDEWExclusion(mz_tol, rt_tol, exclusion_t_0)

    self.exclusion_t_0 = exclusion_t_0

schedule_ms1(new_tasks)

Schedule a new MS1 scan by creating a new default MS1 vimms.Common.ScanParameters. Args: new_tasks: the list of new tasks in the environment

Returns: None

Source code in vimms/Controller/roi.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def schedule_ms1(self, new_tasks):
    """
    Schedule a new MS1 scan by creating a new default MS1 [vimms.Common.ScanParameters][].
    Args:
        new_tasks: the list of new tasks in the environment

    Returns: None

    """
    ms1_scan_params = self.get_ms1_scan_params()
    self.current_task_id += 1
    self.next_processed_scan_id = self.current_task_id
    new_tasks.append(ms1_scan_params)

SWATH

Bases: Controller

A controller that implements SWATH-MS (Sequential Windowed Acquisition of All Theoretical Fragment Ion Mass Spectra) DIA fragmentation strategy. Should be used in conjunction with MS-DIAL for deconvolution.

Source code in vimms/Controller/dia.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class SWATH(Controller):
    """
    A controller that implements SWATH-MS (Sequential Windowed Acquisition of All Theoretical
    Fragment Ion Mass Spectra) DIA fragmentation strategy.
    Should be used in conjunction with MS-DIAL for deconvolution.
    """

    def __init__(self, min_mz, max_mz, width, scan_overlap=0, advanced_params=None):
        """
        Initialise a SWATH-MS controller

        Args:
            min_mz: minimum m/z value
            max_mz: maximum m/z value
            width: width of each SWATH window
            scan_overlap: how much can scans overlap across windows
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
        """
        super().__init__(advanced_params=advanced_params)
        self.width = width
        self.scan_overlap = scan_overlap
        self.min_mz = min_mz  # scan from this mz
        self.max_mz = max_mz  # scan to this mz

        self.scan_number = self.initial_scan_id
        self.exp_info = []  # experimental information - isolation windows

    def write_msdial_experiment_file(self, filename):
        """
        Generates a file that can be read by MS-DIAL to perform deconvolution

        Args:
            filename: path to experiment file in MS-DIAL format

        Returns: None

        """

        heads = ["Experiment", "MS Type", "Min m/z", "Max m/z"]
        start_mz, stop_mz = self._get_start_stop()
        ms1_mz_range = self.advanced_params.default_ms1_scan_window
        ms1_row = ["0", "SCAN", ms1_mz_range[0], ms1_mz_range[1]]
        swath_rows = []
        for i, start in enumerate(start_mz):
            stop = stop_mz[i]
            new_row = [i + 1, "SWATH", start, stop]
            swath_rows.append(new_row)

        out_dir = os.path.dirname(filename)
        create_if_not_exist(out_dir)

        with open(filename, "w", newline="") as f:
            writer = csv.writer(f, delimiter="\t", dialect="excel")
            writer.writerow(heads)
            writer.writerow(ms1_row)
            for row in swath_rows:
                writer.writerow(row)

    def _get_start_stop(self):
        """
        Computes start and stop m/z values

        Returns: a tuple of start m/z values (list) and stop m/z values (list)

        """
        start = self.min_mz
        start_mz = []
        stop_mz = []
        while start < self.max_mz:
            start_mz.append(start)
            stop_mz.append(start + self.width)
            start += self.width - self.scan_overlap
        return start_mz, stop_mz

    def update_state_after_scan(self, last_scan):
        pass

    def _process_scan(self, scan):
        new_tasks = []
        if self.scan_to_process is not None:

            precursor_scan_id = self.scan_to_process.scan_id

            start_mz, stop_mz = self._get_start_stop()

            isolation_width = self.width

            precursor_mz_list = []
            for i, start in enumerate(start_mz):
                precursor_mz = (stop_mz[i] + start) / 2.0
                precursor_mz_list.append(precursor_mz)

            mz_tol = 10  # not used
            rt_tol = 15  # these are not used
            for mz in precursor_mz_list:
                dda_scan_params = self.get_ms2_scan_params(
                    mz, 0, precursor_scan_id, isolation_width, mz_tol, rt_tol
                )
                # push this dda scan to the mass spec queue
                new_tasks.append(dda_scan_params)

            # make the MS1 scan
            task = self.get_ms1_scan_params()
            new_tasks.append(task)

            self.scan_number += len(precursor_mz_list) + 1
            self.next_processed_scan_id = self.scan_number
        return new_tasks

__init__(min_mz, max_mz, width, scan_overlap=0, advanced_params=None)

Initialise a SWATH-MS controller

Parameters:
  • min_mz

    minimum m/z value

  • max_mz

    maximum m/z value

  • width

    width of each SWATH window

  • scan_overlap

    how much can scans overlap across windows

  • advanced_params

    an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

Source code in vimms/Controller/dia.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, min_mz, max_mz, width, scan_overlap=0, advanced_params=None):
    """
    Initialise a SWATH-MS controller

    Args:
        min_mz: minimum m/z value
        max_mz: maximum m/z value
        width: width of each SWATH window
        scan_overlap: how much can scans overlap across windows
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
    """
    super().__init__(advanced_params=advanced_params)
    self.width = width
    self.scan_overlap = scan_overlap
    self.min_mz = min_mz  # scan from this mz
    self.max_mz = max_mz  # scan to this mz

    self.scan_number = self.initial_scan_id
    self.exp_info = []  # experimental information - isolation windows

write_msdial_experiment_file(filename)

Generates a file that can be read by MS-DIAL to perform deconvolution

Parameters:
  • filename

    path to experiment file in MS-DIAL format

Returns: None

Source code in vimms/Controller/dia.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def write_msdial_experiment_file(self, filename):
    """
    Generates a file that can be read by MS-DIAL to perform deconvolution

    Args:
        filename: path to experiment file in MS-DIAL format

    Returns: None

    """

    heads = ["Experiment", "MS Type", "Min m/z", "Max m/z"]
    start_mz, stop_mz = self._get_start_stop()
    ms1_mz_range = self.advanced_params.default_ms1_scan_window
    ms1_row = ["0", "SCAN", ms1_mz_range[0], ms1_mz_range[1]]
    swath_rows = []
    for i, start in enumerate(start_mz):
        stop = stop_mz[i]
        new_row = [i + 1, "SWATH", start, stop]
        swath_rows.append(new_row)

    out_dir = os.path.dirname(filename)
    create_if_not_exist(out_dir)

    with open(filename, "w", newline="") as f:
        writer = csv.writer(f, delimiter="\t", dialect="excel")
        writer.writerow(heads)
        writer.writerow(ms1_row)
        for row in swath_rows:
            writer.writerow(row)

ScanItem

Represents a scan item object. Used by the WeightedDEW controller to store the pair of m/z and intensity values along with their associated weight

Source code in vimms/Controller/topN.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class ScanItem:
    """
    Represents a scan item object. Used by the WeightedDEW controller to store
    the pair of m/z and intensity values along with their associated weight
    """

    def __init__(self, mz, intensity, weight=1):
        """
        Initialise a ScanItem object
        Args:
            mz: m/z value
            intensity: intensity value
            weight: the weight for this ScanItem
        """
        self.mz = mz
        self.intensity = intensity
        self.weight = weight

    def __lt__(self, other):
        if self.intensity * self.weight <= other.intensity * other.weight:
            return True
        else:
            return False

__init__(mz, intensity, weight=1)

Initialise a ScanItem object Args: mz: m/z value intensity: intensity value weight: the weight for this ScanItem

Source code in vimms/Controller/topN.py
196
197
198
199
200
201
202
203
204
205
206
def __init__(self, mz, intensity, weight=1):
    """
    Initialise a ScanItem object
    Args:
        mz: m/z value
        intensity: intensity value
        weight: the weight for this ScanItem
    """
    self.mz = mz
    self.intensity = intensity
    self.weight = weight

ScanParameters

A class to store parameters used to instruct the mass spec how to generate a scan. This object is usually created by the controller. It is used by the controller to instruct the mass spec what actions (scans) to perform next.

Source code in vimms/Common.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
class ScanParameters:
    """
    A class to store parameters used to instruct the mass spec how to
    generate a scan. This object is usually created by the controller.
    It is used by the controller to instruct the mass spec what actions (scans)
    to perform next.
    """

    MS_LEVEL = "ms_level"
    COLLISION_ENERGY = "collision_energy"
    POLARITY = "polarity"
    FIRST_MASS = "first_mass"
    LAST_MASS = "last_mass"
    ORBITRAP_RESOLUTION = "orbitrap_resolution"
    AGC_TARGET = "agc_target"
    MAX_IT = "max_it"
    MASS_ANALYSER = "analyzer"
    ACTIVATION_TYPE = "activation_type"
    ISOLATION_MODE = "isolation_mode"
    SOURCE_CID_ENERGY = "source_cid_energy"
    METADATA = "metadata"
    UNIQUENESS_TOKEN = "uniqueness_token"

    # this is used for DIA-based controllers to specify which windows to
    # fragment
    ISOLATION_WINDOWS = "isolation_windows"

    # precursor m/z and isolation width have to be specified together for
    # DDA-based controllers
    PRECURSOR_MZ = "precursor_mz"
    ISOLATION_WIDTH = "isolation_width"

    # used in Top-N, hybrid and ROI controllers to perform dynamic exclusion
    DYNAMIC_EXCLUSION_MZ_TOL = "dew_mz_tol"
    DYNAMIC_EXCLUSION_RT_TOL = "dew_rt_tol"

    # only used by the hybrid controller for now, since its Top-N may change
    # depending on time for other DDA controllers it's always the same
    # throughout the whole run, so we don't send this parameter
    CURRENT_TOP_N = "current_top_N"

    # if the scan id is specified, then it should be used by the mass spec
    # useful for pre-scheduled controllers where we want the controller
    # to know the scan ids of MS1, MS2
    # and also the precursor ids of those MS2 scans in advance.
    SCAN_ID = "scan_id"

    def __init__(self):
        """
        Create a scan parameter object
        """
        self.params = {}

    def set(self, key, value):
        """
        Set scan parameter value

        Args:
            key: a scan parameter name
            value: a scan parameter value

        Returns: None
        """
        self.params[key] = value

    def get(self, key):
        """
        Gets scan parameter value

        Args:
            key: the key to look for

        Returns: the corresponding value in this ScanParameter
        """
        if key in self.params:
            return self.params[key]
        else:
            return None

    def get_all(self):
        """
        Get all scan parameters
        Returns: all the scan parameters
        """
        return self.params

    def compute_isolation_windows(self):
        """
        Gets the full-width (DDA) isolation window around a precursor m/z
        """
        precursor_list = self.get(ScanParameters.PRECURSOR_MZ)
        precursor_mz_list = [precursor.precursor_mz for precursor in precursor_list]
        isolation_width_list = self.get(ScanParameters.ISOLATION_WIDTH)
        return compute_isolation_windows(isolation_width_list, precursor_mz_list)

    def __repr__(self):
        return "ScanParameters %s" % (self.params)

__init__()

Create a scan parameter object

Source code in vimms/Common.py
331
332
333
334
335
def __init__(self):
    """
    Create a scan parameter object
    """
    self.params = {}

compute_isolation_windows()

Gets the full-width (DDA) isolation window around a precursor m/z

Source code in vimms/Common.py
370
371
372
373
374
375
376
377
def compute_isolation_windows(self):
    """
    Gets the full-width (DDA) isolation window around a precursor m/z
    """
    precursor_list = self.get(ScanParameters.PRECURSOR_MZ)
    precursor_mz_list = [precursor.precursor_mz for precursor in precursor_list]
    isolation_width_list = self.get(ScanParameters.ISOLATION_WIDTH)
    return compute_isolation_windows(isolation_width_list, precursor_mz_list)

get(key)

Gets scan parameter value

Parameters:
  • key

    the key to look for

Returns: the corresponding value in this ScanParameter

Source code in vimms/Common.py
349
350
351
352
353
354
355
356
357
358
359
360
361
def get(self, key):
    """
    Gets scan parameter value

    Args:
        key: the key to look for

    Returns: the corresponding value in this ScanParameter
    """
    if key in self.params:
        return self.params[key]
    else:
        return None

get_all()

Get all scan parameters Returns: all the scan parameters

Source code in vimms/Common.py
363
364
365
366
367
368
def get_all(self):
    """
    Get all scan parameters
    Returns: all the scan parameters
    """
    return self.params

set(key, value)

Set scan parameter value

Parameters:
  • key

    a scan parameter name

  • value

    a scan parameter value

Returns: None

Source code in vimms/Common.py
337
338
339
340
341
342
343
344
345
346
347
def set(self, key, value):
    """
    Set scan parameter value

    Args:
        key: a scan parameter name
        value: a scan parameter value

    Returns: None
    """
    self.params[key] = value

SimpleMs1Controller

Bases: Controller

A simple MS1 controller which does a full scan of the chemical sample, but no fragmentation

Source code in vimms/Controller/fullscan.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class SimpleMs1Controller(Controller):
    """
    A simple MS1 controller which does a full scan of the chemical sample,
    but no fragmentation
    """

    def __init__(self, advanced_params=None):
        """
        Initialise a full-scan MS1 controller
        Args:
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
        """
        super().__init__(advanced_params=advanced_params)

    def _process_scan(self, scan):
        if self.scan_to_process is not None:
            task = self.get_ms1_scan_params()
            self.current_task_id += 1
            self.next_processed_scan_id = self.current_task_id
            self.scan_to_process = None  # set this scan as has been processed
            return [task]

    def update_state_after_scan(self, last_scan):
        pass

__init__(advanced_params=None)

Initialise a full-scan MS1 controller Args: advanced_params: an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

Source code in vimms/Controller/fullscan.py
42
43
44
45
46
47
48
49
50
def __init__(self, advanced_params=None):
    """
    Initialise a full-scan MS1 controller
    Args:
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
    """
    super().__init__(advanced_params=advanced_params)

SmartROIFilter

Bases: ScoreFilter

A class that implements SmartROI filtering criteria. For more details, refer to our paper 'Rapid Development ...'

Source code in vimms/Exclusion.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
class SmartROIFilter(ScoreFilter):
    """
    A class that implements SmartROI filtering criteria.
    For more details, refer to our paper 'Rapid Development ...'
    """

    def filter(self, rois):
        """
        Filter ROIs based on SmartROI rules.


        Args:
            rois: a list of [vimms.Roi.Roi] objects. if this is a normal ROI object,
                  always return True for everything otherwise track the status based
                  on the SmartROI rules

        Returns: an array of indicator whether ROI can be fragmented or not.

        """
        can_fragments = np.array([roi.can_fragment for roi in rois])
        return can_fragments

filter(rois)

Filter ROIs based on SmartROI rules.

Parameters:
  • rois

    a list of [vimms.Roi.Roi] objects. if this is a normal ROI object, always return True for everything otherwise track the status based on the SmartROI rules

Returns: an array of indicator whether ROI can be fragmented or not.

Source code in vimms/Exclusion.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
def filter(self, rois):
    """
    Filter ROIs based on SmartROI rules.


    Args:
        rois: a list of [vimms.Roi.Roi] objects. if this is a normal ROI object,
              always return True for everything otherwise track the status based
              on the SmartROI rules

    Returns: an array of indicator whether ROI can be fragmented or not.

    """
    can_fragments = np.array([roi.can_fragment for roi in rois])
    return can_fragments

TargetedController

Bases: Controller

A controller that is given a list of m/z and RT values to target Attempts to acquire n_replicates of each target at each CE

Source code in vimms/Controller/targeted.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class TargetedController(Controller):
    """
    A controller that is given a list of m/z and RT values to target
    Attempts to acquire n_replicates of each target at each CE
    """

    def __init__(
        self,
        targets,
        ce_values,
        N=10,
        n_replicates=1,
        min_ms1_intensity=5e3,
        isolation_width=DEFAULT_ISOLATION_WIDTH,
        advanced_params=None,
        limit_acquisition=True,
    ):
        super().__init__(advanced_params=advanced_params)
        self.targets = targets
        self.ce_values = ce_values
        self.n_replicates = n_replicates
        self.N = N
        self.isolation_width = isolation_width
        self.min_ms1_intensity = min_ms1_intensity
        self.limit_acquisition = limit_acquisition

        # these will be removed sometime
        self.mz_tol = 10
        self.rt_tol = 10

        self.target_counts = {}
        for t in self.targets:
            self.target_counts[t] = {c: 0 for c in self.ce_values}

        self.scan_record = []  # keeps track of which scan is which
        self.seen_targets = set()

    def update_state_after_scan(self, last_scan):
        pass

    def _process_scan(self, scan):
        new_tasks = []
        if self.scan_to_process is not None:
            precursor_scan_id = self.scan_to_process.scan_id
            mzs = self.scan_to_process.mzs
            intensities = self.scan_to_process.intensities
            rt = self.scan_to_process.rt
            mzi = list(zip(mzs, intensities))

            active_targets = list(
                filter(lambda x: x.active(mzi, rt, self.min_ms1_intensity), self.targets)
            )
            self.seen_targets.update(active_targets)

            target_list = []
            for t in active_targets:
                for ce in self.target_counts[t]:
                    if self.limit_acquisition and self.target_counts[t][ce] == self.n_replicates:
                        continue
                    else:
                        target_list.append((t, ce, self.target_counts[t][ce]))

            if len(target_list) > 0:
                # prioritise by how far we are below the number of
                # repetitions we want
                target_list.sort(key=lambda x: x[2])
                # make some MS2 scans, upto N
                for i in range(min(len(target_list), self.N)):
                    t, ce, _ = target_list[i]
                    metadata = {}
                    if t.adduct is not None:
                        metadata["adduct"] = t.adduct
                    if t.metadata is not None:
                        # copy the rest of metadata from target
                        metadata.update(t.metadata)
                    dda_scan_params = self.get_ms2_scan_params(
                        t.mz,
                        1e3,
                        precursor_scan_id,
                        self.isolation_width,
                        self.mz_tol,
                        self.rt_tol,
                        metadata=metadata,
                    )
                    dda_scan_params.set(ScanParameters.COLLISION_ENERGY, ce)
                    new_tasks.append(dda_scan_params)
                    self.current_task_id += 1
                    self.target_counts[t][ce] += 1
                    self.scan_record.append([self.current_task_id, t, ce])

            # make the MS1 scan
            ms1_scan_params = self.get_ms1_scan_params()
            self.current_task_id += 1
            self.next_processed_scan_id = self.current_task_id
            self.scan_record.append([self.current_task_id, None, None])
            new_tasks.append(ms1_scan_params)
        return new_tasks

    def summarise_activity(self, output_method):
        output_method("Summary of targets")
        output_method("=================")
        found = set()
        found_names = set()
        unique_names = set([t.name for t in self.targets])
        for target in self.targets:
            output_method(target)
            for c in self.ce_values:
                output_method("\t{} -> {} scans".format(c, self.target_counts[target][c]))
                if self.target_counts[target][c] > 0:
                    found.add(target)
                    found_names.add(target.name)
        output_method("SUMMARY")
        output_method("========")
        output_method("{} out of {} have one or more scans".format(len(found), len(self.targets)))
        output_method(
            "{} of the {} unique names have more than one scan".format(
                len(found_names), len(unique_names)
            )
        )

TaskFilter

Can be used with vimms.Controller.misc.FixedScansController and its subclasses to resynchronise scan RTs if they don't go according to schedule. New scans are dynamically added or deleted if the current RT is before or after the planned RT by a certain threshold (scaled to scan lengths).

Source code in vimms/Controller/misc.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class TaskFilter:
    """
    Can be used with [vimms.Controller.misc.FixedScansController][] and its
    subclasses to resynchronise scan RTs if they don't go according to
    schedule. New scans are dynamically added or deleted if the current RT
    is before or after the planned RT by a certain threshold (scaled to scan
    lengths).
    """

    def __init__(self, ms1_length, ms2_length, skip_margin=0.5, add_margin=1.2):
        """
        Create a new TaskFilter.

        Args:
            ms1_length: Expected length of MS1 scans.
            ms2_length: Expected length of MS2 scans.
            skip_margin: Cancel the current task if current RT is closer
                to expected RT of the following task than skip_margin *
                current_task_length.
            add_margin: Add a filler scan if current RT is further away
                from expected RT of the current task than add_margin *
                min_task_length.
        """
        self.ms1_length = ms1_length
        self.ms2_length = ms2_length
        self.min_length = min(ms1_length, ms2_length)

        self.skip_margin = skip_margin
        self.add_margin = add_margin

    @staticmethod
    def _find_nearest_ms2(task_idx, tasks):
        max_dist = max(task_idx + 1, len(tasks) - task_idx)
        for dist in range(1, max_dist):

            right_idx = task_idx + dist
            if right_idx < len(tasks) and tasks[right_idx].get(ScanParameters.MS_LEVEL) == 2:
                return tasks[right_idx]

            left_idx = task_idx - dist
            if left_idx > -1 and tasks[left_idx].get(ScanParameters.MS_LEVEL) == 2:
                return tasks[left_idx]

        return None

    @classmethod
    def _make_scan(cls, task, scan_id, precursor_id):
        if task.get(ScanParameters.MS_LEVEL) not in [1, 2]:
            raise NotImplementedError(
                f"""{cls} isn't sure how to copy a ScanParameters with
                ms_level = {task.get(ScanParameters.MS_LEVEL)}"""
            )

        s = copy.deepcopy(task)
        s.set(ScanParameters.SCAN_ID, scan_id)
        if s.get(ScanParameters.MS_LEVEL) == 2:
            try:
                # not sure there is any sensible way to change this without directly
                # modifying internals, so here we defend against future changes in
                # ScanParameters
                for p in s.get(ScanParameters.PRECURSOR_MZ):
                    pass
            except Exception:
                raise NotImplementedError(
                    f"""
                    ScanParameters doesn't have the expected internal structure
                    for precursor information. {cls} is probably about to
                    make a horrible mistake.
                    """
                )

            for p in s.get(ScanParameters.PRECURSOR_MZ):
                p.precursor_scan_id = precursor_id
        return s

    @classmethod
    def _make_ms2(cls, task_idx, tasks, scan_id, precursor_id):
        template_scan = TaskFilter._find_nearest_ms2(task_idx, tasks)

        if template_scan is None:
            return get_dda_scan_param(
                precursor_mz=100.0,
                intensity=0.0,
                precursor_scan_id=precursor_id,
                isolation_width=DEFAULT_ISOLATION_WIDTH,
                mz_tol=0.0,
                rt_tol=0.0,
                scan_id=scan_id,
            )
        else:
            return cls._make_scan(tasks[task_idx], scan_id, precursor_id)

    def _get_task_length(self, task):
        if task.get(ScanParameters.MS_LEVEL) == 1:
            return self.ms1_length
        elif task.get(ScanParameters.MS_LEVEL) == 2:
            return self.ms2_length

        raise NotImplementedError(f"MS_LEVEL: {task.get(ScanParameters.MS_LEVEL)}")

    def get_task(self, scan, scan_id, precursor_id, task_idx, expected_rts, tasks):
        """
        Gets the next task and updates the current task index for the
        parent controller.

        Args:
            scan: Current scan from parent.
            scan_id: ID of scan.
            precursor_id: ID of last MS1.
            task_idx: Current index in task queue.
            expected_rts: Queue of expected RTs corresponding to tasks.
            tasks: Full task queue.

        Returns: Tuple of new task index and next task.
        """
        actual_rt = scan.rt + scan.scan_duration
        expected_rt = expected_rts[task_idx]
        rt_dist = expected_rt - actual_rt

        if rt_dist > self.add_margin * self.min_length:
            if self.ms1_length > self.ms2_length:
                if rt_dist > self.add_margin * self.ms1_length:
                    new_task = get_default_scan_params(scan_id=scan_id)
                else:
                    new_task = self._make_ms2(task_idx, tasks, scan_id, precursor_id)
            else:
                if rt_dist > self.add_margin * self.ms2_length:
                    new_task = self._make_ms2(task_idx, tasks, scan_id, precursor_id)
                else:
                    new_task = get_default_scan_params(scan_id=scan_id)
            return task_idx, new_task
        else:
            if task_idx >= len(tasks) - 1:
                return task_idx, self._make_scan(tasks[task_idx], scan_id, precursor_id)

            while actual_rt >= expected_rts[
                task_idx + 1
            ] - self.skip_margin * self._get_task_length(tasks[task_idx]):
                task_idx += 1
                if task_idx >= len(tasks) - 1:
                    return task_idx, self._make_scan(tasks[task_idx], scan_id, precursor_id)

            return task_idx + 1, self._make_scan(tasks[task_idx], scan_id, precursor_id)

__init__(ms1_length, ms2_length, skip_margin=0.5, add_margin=1.2)

Create a new TaskFilter.

Parameters:
  • ms1_length

    Expected length of MS1 scans.

  • ms2_length

    Expected length of MS2 scans.

  • skip_margin

    Cancel the current task if current RT is closer to expected RT of the following task than skip_margin * current_task_length.

  • add_margin

    Add a filler scan if current RT is further away from expected RT of the current task than add_margin * min_task_length.

Source code in vimms/Controller/misc.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self, ms1_length, ms2_length, skip_margin=0.5, add_margin=1.2):
    """
    Create a new TaskFilter.

    Args:
        ms1_length: Expected length of MS1 scans.
        ms2_length: Expected length of MS2 scans.
        skip_margin: Cancel the current task if current RT is closer
            to expected RT of the following task than skip_margin *
            current_task_length.
        add_margin: Add a filler scan if current RT is further away
            from expected RT of the current task than add_margin *
            min_task_length.
    """
    self.ms1_length = ms1_length
    self.ms2_length = ms2_length
    self.min_length = min(ms1_length, ms2_length)

    self.skip_margin = skip_margin
    self.add_margin = add_margin

get_task(scan, scan_id, precursor_id, task_idx, expected_rts, tasks)

Gets the next task and updates the current task index for the parent controller.

Parameters:
  • scan

    Current scan from parent.

  • scan_id

    ID of scan.

  • precursor_id

    ID of last MS1.

  • task_idx

    Current index in task queue.

  • expected_rts

    Queue of expected RTs corresponding to tasks.

  • tasks

    Full task queue.

Returns: Tuple of new task index and next task.

Source code in vimms/Controller/misc.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_task(self, scan, scan_id, precursor_id, task_idx, expected_rts, tasks):
    """
    Gets the next task and updates the current task index for the
    parent controller.

    Args:
        scan: Current scan from parent.
        scan_id: ID of scan.
        precursor_id: ID of last MS1.
        task_idx: Current index in task queue.
        expected_rts: Queue of expected RTs corresponding to tasks.
        tasks: Full task queue.

    Returns: Tuple of new task index and next task.
    """
    actual_rt = scan.rt + scan.scan_duration
    expected_rt = expected_rts[task_idx]
    rt_dist = expected_rt - actual_rt

    if rt_dist > self.add_margin * self.min_length:
        if self.ms1_length > self.ms2_length:
            if rt_dist > self.add_margin * self.ms1_length:
                new_task = get_default_scan_params(scan_id=scan_id)
            else:
                new_task = self._make_ms2(task_idx, tasks, scan_id, precursor_id)
        else:
            if rt_dist > self.add_margin * self.ms2_length:
                new_task = self._make_ms2(task_idx, tasks, scan_id, precursor_id)
            else:
                new_task = get_default_scan_params(scan_id=scan_id)
        return task_idx, new_task
    else:
        if task_idx >= len(tasks) - 1:
            return task_idx, self._make_scan(tasks[task_idx], scan_id, precursor_id)

        while actual_rt >= expected_rts[
            task_idx + 1
        ] - self.skip_margin * self._get_task_length(tasks[task_idx]):
            task_idx += 1
            if task_idx >= len(tasks) - 1:
                return task_idx, self._make_scan(tasks[task_idx], scan_id, precursor_id)

        return task_idx + 1, self._make_scan(tasks[task_idx], scan_id, precursor_id)

TopNController

Bases: Controller

A controller that implements the standard Top-N DDA fragmentation strategy. Does an MS1 scan followed by N fragmentation scans of the peaks with the highest intensity that are not excluded

Source code in vimms/Controller/topN.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class TopNController(Controller):
    """
    A controller that implements the standard Top-N DDA fragmentation strategy.
    Does an MS1 scan followed by N fragmentation scans of
    the peaks with the highest intensity that are not excluded
    """

    def __init__(
        self,
        ionisation_mode,
        N,
        isolation_width,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        ms1_shift=0,
        initial_exclusion_list=None,
        advanced_params=None,
        force_N=False,
        exclude_after_n_times=1,
        exclude_t0=0,
    ):
        """
        Initialise the Top-N controller

        Args:
            ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
            N: the number of highest-intensity precursor ions to fragment
            isolation_width: isolation width in Dalton
            mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
            rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
            min_ms1_intensity: the minimum intensity to fragment a precursor ion
            ms1_shift: advanced parameter -- best to leave it.
            initial_exclusion_list: initial list of exclusion boxes
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
            force_N: whether to always force N fragmentations.
            exclude_after_n_times; allow ions to NOT be excluded up to n_times.
            exclude_t0: time for initial exclusion check.
        """
        super().__init__(advanced_params=advanced_params)

        self.ionisation_mode = ionisation_mode

        # the top N ions to fragment
        self.N = N

        # the isolation width (in Dalton) to select a precursor ion
        self.isolation_width = isolation_width

        # the m/z window (ppm) to prevent the same precursor ion to be
        # fragmented again
        self.mz_tol = mz_tol

        # the rt window to prevent the same precursor ion to be
        # fragmented again
        self.rt_tol = rt_tol

        # minimum ms1 intensity to fragment
        self.min_ms1_intensity = min_ms1_intensity

        # number of scans to move ms1 scan forward in list of new_tasks
        self.ms1_shift = ms1_shift

        # force it to do N MS2 scans regardless
        self.force_N = force_N

        if self.force_N and ms1_shift > 0:
            logger.warning(
                "Setting force_N to True with non-zero shift can lead to " "strange behaviour"
            )

        self.exclusion = TopNExclusion(
            self.mz_tol,
            self.rt_tol,
            exclude_after_n_times=exclude_after_n_times,
            exclude_t0=exclude_t0,
            initial_exclusion_list=initial_exclusion_list,
        )

    def _process_scan(self, scan):
        # if there's a previous ms1 scan to process
        new_tasks = []
        fragmented_count = 0

        if self.scan_to_process is not None:
            assert self.scan_to_process == scan

            # original scan data
            mzs = self.scan_to_process.mzs
            intensities = self.scan_to_process.intensities
            assert mzs.shape == intensities.shape
            rt = self.scan_to_process.rt

            # loop over points in decreasing intensity
            idx = np.argsort(intensities)[::-1]

            done_ms1 = False
            ms2_tasks = []
            for i in idx:
                mz = mzs[i]
                intensity = intensities[i]

                # stopping criteria is after we've fragmented N ions or
                # we found ion < min_intensity
                if fragmented_count >= self.N:
                    break

                if intensity < self.min_ms1_intensity:
                    logger.debug(
                        "Time %f Minimum intensity threshold %f reached "
                        "at %f, %d" % (rt, self.min_ms1_intensity, intensity, fragmented_count)
                    )
                    break

                # skip ion in the dynamic exclusion list of the mass spec
                is_exc, weight = self.exclusion.is_excluded(mz, rt)
                if is_exc:
                    continue

                # create a new ms2 scan parameter to be sent to the mass spec
                precursor_scan_id = self.scan_to_process.scan_id
                dda_scan_params = self.get_ms2_scan_params(
                    mz,
                    intensity,
                    precursor_scan_id,
                    self.isolation_width,
                    self.mz_tol,
                    self.rt_tol,
                )
                new_tasks.append(dda_scan_params)
                ms2_tasks.append(dda_scan_params)
                fragmented_count += 1
                self.current_task_id += 1

                # add an ms1 here
                if fragmented_count == self.N - self.ms1_shift:
                    ms1_scan_params = self.get_ms1_scan_params()
                    self.current_task_id += 1
                    self.next_processed_scan_id = self.current_task_id
                    new_tasks.append(ms1_scan_params)
                    done_ms1 = True

            if self.force_N and len(new_tasks) < self.N:
                # add some extra tasks.
                n_tasks_remaining = self.N - len(new_tasks)
                for i in range(n_tasks_remaining):
                    precursor_scan_id = self.scan_to_process.scan_id
                    dda_scan_params = self.get_ms2_scan_params(
                        DUMMY_PRECURSOR_MZ,
                        100.0,
                        precursor_scan_id,
                        self.isolation_width,
                        self.mz_tol,
                        self.rt_tol,
                    )
                    new_tasks.append(dda_scan_params)
                    ms2_tasks.append(dda_scan_params)
                    fragmented_count += 1
                    self.current_task_id += 1

            # if no ms1 has been added, then add at the end
            if not done_ms1:
                # if fragmented_count < self.N - self.ms1_shift:
                ms1_scan_params = self.get_ms1_scan_params()
                self.current_task_id += 1
                self.next_processed_scan_id = self.current_task_id
                new_tasks.append(ms1_scan_params)

            # create new exclusion items based on the scheduled ms2 tasks
            self.exclusion.update(self.scan_to_process, ms2_tasks)

            # set this ms1 scan as has been processed
            self.scan_to_process = None
        return new_tasks

    def update_state_after_scan(self, scan):
        pass

__init__(ionisation_mode, N, isolation_width, mz_tol, rt_tol, min_ms1_intensity, ms1_shift=0, initial_exclusion_list=None, advanced_params=None, force_N=False, exclude_after_n_times=1, exclude_t0=0)

Initialise the Top-N controller

Parameters:
  • ionisation_mode

    ionisation mode, either POSITIVE or NEGATIVE

  • N

    the number of highest-intensity precursor ions to fragment

  • isolation_width

    isolation width in Dalton

  • mz_tol

    m/z tolerance -- m/z tolerance for dynamic exclusion window

  • rt_tol

    RT tolerance -- RT tolerance for dynamic exclusion window

  • min_ms1_intensity

    the minimum intensity to fragment a precursor ion

  • ms1_shift

    advanced parameter -- best to leave it.

  • initial_exclusion_list

    initial list of exclusion boxes

  • advanced_params

    an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

  • force_N

    whether to always force N fragmentations.

  • exclude_t0

    time for initial exclusion check.

Source code in vimms/Controller/topN.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(
    self,
    ionisation_mode,
    N,
    isolation_width,
    mz_tol,
    rt_tol,
    min_ms1_intensity,
    ms1_shift=0,
    initial_exclusion_list=None,
    advanced_params=None,
    force_N=False,
    exclude_after_n_times=1,
    exclude_t0=0,
):
    """
    Initialise the Top-N controller

    Args:
        ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
        N: the number of highest-intensity precursor ions to fragment
        isolation_width: isolation width in Dalton
        mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
        rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
        min_ms1_intensity: the minimum intensity to fragment a precursor ion
        ms1_shift: advanced parameter -- best to leave it.
        initial_exclusion_list: initial list of exclusion boxes
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
        force_N: whether to always force N fragmentations.
        exclude_after_n_times; allow ions to NOT be excluded up to n_times.
        exclude_t0: time for initial exclusion check.
    """
    super().__init__(advanced_params=advanced_params)

    self.ionisation_mode = ionisation_mode

    # the top N ions to fragment
    self.N = N

    # the isolation width (in Dalton) to select a precursor ion
    self.isolation_width = isolation_width

    # the m/z window (ppm) to prevent the same precursor ion to be
    # fragmented again
    self.mz_tol = mz_tol

    # the rt window to prevent the same precursor ion to be
    # fragmented again
    self.rt_tol = rt_tol

    # minimum ms1 intensity to fragment
    self.min_ms1_intensity = min_ms1_intensity

    # number of scans to move ms1 scan forward in list of new_tasks
    self.ms1_shift = ms1_shift

    # force it to do N MS2 scans regardless
    self.force_N = force_N

    if self.force_N and ms1_shift > 0:
        logger.warning(
            "Setting force_N to True with non-zero shift can lead to " "strange behaviour"
        )

    self.exclusion = TopNExclusion(
        self.mz_tol,
        self.rt_tol,
        exclude_after_n_times=exclude_after_n_times,
        exclude_t0=exclude_t0,
        initial_exclusion_list=initial_exclusion_list,
    )

TopNEXtController

Bases: RoiController

A multi-sample controller that use a grid to track which regions-of-interests (ROIs) have been fragmented across multiple injections.

Source code in vimms/Controller/box.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class TopNEXtController(RoiController):
    """
    A multi-sample controller that use a grid to track which regions-of-interests (ROIs)
    have been fragmented across multiple injections.
    """

    def __init__(
        self,
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        grid,
        smartroi_params=None,
        min_roi_length_for_fragmentation=0,
        ms1_shift=0,
        advanced_params=None,
        register_all_roi=False,
        scoring_params=GRID_CONTROLLER_SCORING_PARAMS,
        exclusion_method=ROI_EXCLUSION_DEW,
        exclusion_t_0=None,
    ):
        """
        Create a grid controller.

        Args:
            ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
            isolation_width: isolation width in Dalton
            N: the number of highest-score precursor ions to fragment
            mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
            rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
            min_ms1_intensity: the minimum intensity to fragment a precursor ion
            roi_params: an instance of [vimms.Roi.RoiBuilderParams][] that describes
                        how to build ROIs in real time based on incoming scans.
            grid: an instance of BoxManager for exclusion/inclusion boxes.
            smartroi_params: an instance of [vimms.Roi.SmartRoiParams][]. If provided, then
                             the SmartROI rules (as described in the paper) will be used to select
                             which ROI to fragment. Otherwise set to None to use standard ROIs.
            min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented.
            ms1_shift: advanced parameter -- best to leave it.
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
            register_all_roi: whether to register all ROIs or not
            scoring_params: a dictionary of parameters used when calculating scores
            exclusion_method: an instance of [vimms.Exclusion.TopNExclusion][] or its subclasses,
                              used to describe how to perform dynamic exclusion so that precursors
                              that have been fragmented are not fragmented again.
            exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).
        """
        super().__init__(
            ionisation_mode,
            isolation_width,
            N,
            mz_tol,
            rt_tol,
            min_ms1_intensity,
            roi_params,
            smartroi_params=smartroi_params,
            min_roi_length_for_fragmentation=min_roi_length_for_fragmentation,
            ms1_shift=ms1_shift,
            advanced_params=advanced_params,
            exclusion_method=exclusion_method,
            exclusion_t_0=exclusion_t_0,
        )

        self.roi_builder = RoiBuilder(roi_params, smartroi_params=smartroi_params)
        self.grid = grid  # helps us understand previous RoIs
        self.register_all_roi = register_all_roi
        self.scoring_params = scoring_params

    def update_state_after_scan(self, scan):
        super().update_state_after_scan(scan)
        self.grid.send_training_data(scan)

    def _set_fragmented(self, i, roi_id, rt, intensity):
        super()._set_fragmented(i, roi_id, rt, intensity)
        self.grid.register_roi(self.roi_builder.live_roi[i])

    def _add_inclusion_scores(self, scores):
        if self.roi_builder.live_roi == []:
            return scores

        maxm = np.max(scores)
        return scores + np.array(
            [
                (
                    maxm
                    if self.grid.point_in_box(Point(r[-1][0], r[-1][1]), idx=self.grid.IN_GEOM)
                    else 0.0
                )
                for r in self.roi_builder.live_roi
            ]
        )

    def _get_scores(self):
        if self.roi_builder.live_roi != []:
            rt = max(r.max_rt for r in self.roi_builder.live_roi)
            self.grid.set_active_boxes(rt)

        dda_scores = self._log_roi_intensities() * self._overlap_scores()
        inclusion_scores = self._add_inclusion_scores(dda_scores)

        if self.roi_builder.roi_type == ROI_TYPE_SMART:  # smart ROI scoring
            final_scores = (
                inclusion_scores * self._smartroi_filter() * self._min_intensity_filter()
            )
        else:  # normal ROI
            final_scores = inclusion_scores * self._score_filters()

        return self._get_top_N_scores(final_scores)

    def after_injection_cleanup(self):
        self.grid.update_after_injection()

__init__(ionisation_mode, isolation_width, N, mz_tol, rt_tol, min_ms1_intensity, roi_params, grid, smartroi_params=None, min_roi_length_for_fragmentation=0, ms1_shift=0, advanced_params=None, register_all_roi=False, scoring_params=GRID_CONTROLLER_SCORING_PARAMS, exclusion_method=ROI_EXCLUSION_DEW, exclusion_t_0=None)

Create a grid controller.

Parameters:
  • ionisation_mode

    ionisation mode, either POSITIVE or NEGATIVE

  • isolation_width

    isolation width in Dalton

  • N

    the number of highest-score precursor ions to fragment

  • mz_tol

    m/z tolerance -- m/z tolerance for dynamic exclusion window

  • rt_tol

    RT tolerance -- RT tolerance for dynamic exclusion window

  • min_ms1_intensity

    the minimum intensity to fragment a precursor ion

  • roi_params

    an instance of vimms.Roi.RoiBuilderParams that describes how to build ROIs in real time based on incoming scans.

  • grid

    an instance of BoxManager for exclusion/inclusion boxes.

  • smartroi_params

    an instance of vimms.Roi.SmartRoiParams. If provided, then the SmartROI rules (as described in the paper) will be used to select which ROI to fragment. Otherwise set to None to use standard ROIs.

  • min_roi_length_for_fragmentation

    how long a ROI should be before it can be fragmented.

  • ms1_shift

    advanced parameter -- best to leave it.

  • advanced_params

    an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

  • register_all_roi

    whether to register all ROIs or not

  • scoring_params

    a dictionary of parameters used when calculating scores

  • exclusion_method

    an instance of vimms.Exclusion.TopNExclusion or its subclasses, used to describe how to perform dynamic exclusion so that precursors that have been fragmented are not fragmented again.

  • exclusion_t_0

    parameter for WeightedDEW exclusion (refer to paper for details).

Source code in vimms/Controller/box.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    ionisation_mode,
    isolation_width,
    N,
    mz_tol,
    rt_tol,
    min_ms1_intensity,
    roi_params,
    grid,
    smartroi_params=None,
    min_roi_length_for_fragmentation=0,
    ms1_shift=0,
    advanced_params=None,
    register_all_roi=False,
    scoring_params=GRID_CONTROLLER_SCORING_PARAMS,
    exclusion_method=ROI_EXCLUSION_DEW,
    exclusion_t_0=None,
):
    """
    Create a grid controller.

    Args:
        ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
        isolation_width: isolation width in Dalton
        N: the number of highest-score precursor ions to fragment
        mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
        rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
        min_ms1_intensity: the minimum intensity to fragment a precursor ion
        roi_params: an instance of [vimms.Roi.RoiBuilderParams][] that describes
                    how to build ROIs in real time based on incoming scans.
        grid: an instance of BoxManager for exclusion/inclusion boxes.
        smartroi_params: an instance of [vimms.Roi.SmartRoiParams][]. If provided, then
                         the SmartROI rules (as described in the paper) will be used to select
                         which ROI to fragment. Otherwise set to None to use standard ROIs.
        min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented.
        ms1_shift: advanced parameter -- best to leave it.
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
        register_all_roi: whether to register all ROIs or not
        scoring_params: a dictionary of parameters used when calculating scores
        exclusion_method: an instance of [vimms.Exclusion.TopNExclusion][] or its subclasses,
                          used to describe how to perform dynamic exclusion so that precursors
                          that have been fragmented are not fragmented again.
        exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).
    """
    super().__init__(
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        smartroi_params=smartroi_params,
        min_roi_length_for_fragmentation=min_roi_length_for_fragmentation,
        ms1_shift=ms1_shift,
        advanced_params=advanced_params,
        exclusion_method=exclusion_method,
        exclusion_t_0=exclusion_t_0,
    )

    self.roi_builder = RoiBuilder(roi_params, smartroi_params=smartroi_params)
    self.grid = grid  # helps us understand previous RoIs
    self.register_all_roi = register_all_roi
    self.scoring_params = scoring_params

TopNExclusion

A class that perform standard dynamic exclusion for Top-N. This is based on checked whether an m/z and RT value lies in certain exclusion boxes.

Source code in vimms/Exclusion.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
class TopNExclusion:
    """
    A class that perform standard dynamic exclusion for Top-N.
    This is based on checked whether an m/z and RT value lies in certain exclusion boxes.
    """

    def __init__(
        self, mz_tol, rt_tol, exclude_after_n_times=1, exclude_t0=0, initial_exclusion_list=None
    ):
        """
        Initialise a Top-N dynamic exclusion object

        Args:
            mz_tol:
            rt_tol:
            exclude_after_n_times:
            exclude_t0:
            initial_exclusion_list:
        """
        self.mz_tol = mz_tol
        self.rt_tol = rt_tol
        self.exclude_after_n_times = exclude_after_n_times
        self.exclude_t0 = exclude_t0

        self.exclude_check = BoxHolder()
        self.dynamic_exclusion = BoxHolder()

        # Initialise 'dynamic_exclusion' with its initial value, if provided
        if initial_exclusion_list is not None:
            for initial in initial_exclusion_list:
                self.dynamic_exclusion.add_box(initial)

    def is_excluded(self, mz, rt):
        """
        Checks if a pair of (mz, rt) value is currently excluded by
        dynamic exclusion window

        Args:
            mz: m/z value
            rt: RT value
            mz_tol: m/z tolerance
            rt_tol: rt_tolerance

        Returns: True if excluded (with weight 0.0), False otherwise (weight 1.0).

        """
        # check the main dynamic exclusion list to see if this ion should be excluded
        dew_check = self.dynamic_exclusion.is_in_box(mz, rt)
        if dew_check:
            return True, 0.0

        # if not excluded, then check the initial list to see if we need to increment count
        found = False
        hits = self.exclude_check.check_point(mz, rt)
        if len(hits) > 0:  # if there are initial hits, increment them

            # here we increment all hits that contain this (mz, rt) point
            # and check if any of them has been excluded more times than the threshold
            for box in hits:
                box.increment_counter()
                if box.counter >= self.exclude_after_n_times:
                    found = True

        # if some boxes have hit threshold that were reached, exclude this ion
        if found:
            x = self._get_exclusion_item(mz, rt, self.mz_tol, self.rt_tol)
            self.dynamic_exclusion.add_box(x)
            return True, 0.0

        # finally this ion is not excluded if it is not in either the main or initial lists
        return False, 1.0

    def update(self, current_scan, ms2_tasks):
        """
        For every scheduled MS2 scan, add its precursor m/z for initial exclusion check
        A tolerance of initial_t0 is used

        Args:
            current_scan: the current MS1 scan
            ms2_tasks: scheduled ms2 tasks

        Returns: None

        """
        rt = current_scan.rt
        for task in ms2_tasks:
            for precursor in task.get("precursor_mz"):
                mz = precursor.precursor_mz

                # new way of checking DEW -- with an initial boxholder to check first
                if self.exclude_t0 > 0:
                    x = self._get_exclusion_item(mz, rt, self.mz_tol, self.exclude_t0)
                    self.exclude_check.add_box(x)

                else:  # fallback to the old way by adding directly to the DEW boxholder
                    x = self._get_exclusion_item(mz, rt, self.mz_tol, self.rt_tol)
                    self.dynamic_exclusion.add_box(x)

    def _get_exclusion_item(self, mz, rt, mz_tol, rt_tol):
        """
        Create a new [vimms.Exclusion.ExclusionItem][] object based on the (mz, rt) values
        as well as the tolerances.
        Args:
            mz: the m/z value
            rt: the RT value
            mz_tol: the m/z tolerance (in ppm)
            rt_tol: the RT tolerance (in seconds)

        Returns: a new [vimms.Exclusion.ExclusionItem][] object

        """
        mz_lower = mz * (1 - mz_tol / 1e6)
        mz_upper = mz * (1 + mz_tol / 1e6)
        rt_lower = rt - rt_tol
        # I think this is mostly for topN (iterative) exclusion method
        rt_upper = rt + rt_tol
        x = ExclusionItem(
            from_mz=mz_lower, to_mz=mz_upper, from_rt=rt_lower, to_rt=rt_upper, frag_at=rt
        )
        return x

__init__(mz_tol, rt_tol, exclude_after_n_times=1, exclude_t0=0, initial_exclusion_list=None)

Initialise a Top-N dynamic exclusion object

Parameters:
  • mz_tol
  • rt_tol
  • exclude_after_n_times
  • exclude_t0
  • initial_exclusion_list
Source code in vimms/Exclusion.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def __init__(
    self, mz_tol, rt_tol, exclude_after_n_times=1, exclude_t0=0, initial_exclusion_list=None
):
    """
    Initialise a Top-N dynamic exclusion object

    Args:
        mz_tol:
        rt_tol:
        exclude_after_n_times:
        exclude_t0:
        initial_exclusion_list:
    """
    self.mz_tol = mz_tol
    self.rt_tol = rt_tol
    self.exclude_after_n_times = exclude_after_n_times
    self.exclude_t0 = exclude_t0

    self.exclude_check = BoxHolder()
    self.dynamic_exclusion = BoxHolder()

    # Initialise 'dynamic_exclusion' with its initial value, if provided
    if initial_exclusion_list is not None:
        for initial in initial_exclusion_list:
            self.dynamic_exclusion.add_box(initial)

is_excluded(mz, rt)

Checks if a pair of (mz, rt) value is currently excluded by dynamic exclusion window

Parameters:
  • mz

    m/z value

  • rt

    RT value

  • mz_tol

    m/z tolerance

  • rt_tol

    rt_tolerance

Returns: True if excluded (with weight 0.0), False otherwise (weight 1.0).

Source code in vimms/Exclusion.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def is_excluded(self, mz, rt):
    """
    Checks if a pair of (mz, rt) value is currently excluded by
    dynamic exclusion window

    Args:
        mz: m/z value
        rt: RT value
        mz_tol: m/z tolerance
        rt_tol: rt_tolerance

    Returns: True if excluded (with weight 0.0), False otherwise (weight 1.0).

    """
    # check the main dynamic exclusion list to see if this ion should be excluded
    dew_check = self.dynamic_exclusion.is_in_box(mz, rt)
    if dew_check:
        return True, 0.0

    # if not excluded, then check the initial list to see if we need to increment count
    found = False
    hits = self.exclude_check.check_point(mz, rt)
    if len(hits) > 0:  # if there are initial hits, increment them

        # here we increment all hits that contain this (mz, rt) point
        # and check if any of them has been excluded more times than the threshold
        for box in hits:
            box.increment_counter()
            if box.counter >= self.exclude_after_n_times:
                found = True

    # if some boxes have hit threshold that were reached, exclude this ion
    if found:
        x = self._get_exclusion_item(mz, rt, self.mz_tol, self.rt_tol)
        self.dynamic_exclusion.add_box(x)
        return True, 0.0

    # finally this ion is not excluded if it is not in either the main or initial lists
    return False, 1.0

update(current_scan, ms2_tasks)

For every scheduled MS2 scan, add its precursor m/z for initial exclusion check A tolerance of initial_t0 is used

Parameters:
  • current_scan

    the current MS1 scan

  • ms2_tasks

    scheduled ms2 tasks

Returns: None

Source code in vimms/Exclusion.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def update(self, current_scan, ms2_tasks):
    """
    For every scheduled MS2 scan, add its precursor m/z for initial exclusion check
    A tolerance of initial_t0 is used

    Args:
        current_scan: the current MS1 scan
        ms2_tasks: scheduled ms2 tasks

    Returns: None

    """
    rt = current_scan.rt
    for task in ms2_tasks:
        for precursor in task.get("precursor_mz"):
            mz = precursor.precursor_mz

            # new way of checking DEW -- with an initial boxholder to check first
            if self.exclude_t0 > 0:
                x = self._get_exclusion_item(mz, rt, self.mz_tol, self.exclude_t0)
                self.exclude_check.add_box(x)

            else:  # fallback to the old way by adding directly to the DEW boxholder
                x = self._get_exclusion_item(mz, rt, self.mz_tol, self.rt_tol)
                self.dynamic_exclusion.add_box(x)

TopN_RoiController

Bases: RoiController

A ROI-based controller that implements the Top-N selection.

Source code in vimms/Controller/roi.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
class TopN_RoiController(RoiController):
    """
    A ROI-based controller that implements the Top-N selection.
    """

    def __init__(
        self,
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        min_roi_length_for_fragmentation=0,
        ms1_shift=0,
        advanced_params=None,
        exclusion_method=ROI_EXCLUSION_DEW,
        exclusion_t_0=None,
    ):
        """
        Initialise the Top-N SmartROI controller.

        Args:
            ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
            isolation_width: isolation width in Dalton
            N: the number of highest-score precursor ions to fragment
            mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
            rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
            min_ms1_intensity: the minimum intensity to fragment a precursor ion
            roi_params: an instance of [vimms.Roi.RoiBuilderParams][] that describes
                        how to build ROIs in real time based on incoming scans.
            min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented.
            ms1_shift: advanced parameter -- best to leave it.
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
            exclusion_method: an instance of [vimms.Exclusion.TopNExclusion][] or its subclasses,
                              used to describe how to perform dynamic exclusion so that precursors
                              that have been fragmented are not fragmented again.
            exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).
        """
        super().__init__(
            ionisation_mode,
            isolation_width,
            N,
            mz_tol,
            rt_tol,
            min_ms1_intensity,
            roi_params,
            min_roi_length_for_fragmentation=min_roi_length_for_fragmentation,
            ms1_shift=ms1_shift,
            advanced_params=advanced_params,
            exclusion_method=exclusion_method,
            exclusion_t_0=exclusion_t_0,
        )

    def _get_scores(self):
        initial_scores = self._get_dda_scores()
        scores = self._get_top_N_scores(initial_scores)
        return scores

__init__(ionisation_mode, isolation_width, N, mz_tol, rt_tol, min_ms1_intensity, roi_params, min_roi_length_for_fragmentation=0, ms1_shift=0, advanced_params=None, exclusion_method=ROI_EXCLUSION_DEW, exclusion_t_0=None)

Initialise the Top-N SmartROI controller.

Parameters:
  • ionisation_mode

    ionisation mode, either POSITIVE or NEGATIVE

  • isolation_width

    isolation width in Dalton

  • N

    the number of highest-score precursor ions to fragment

  • mz_tol

    m/z tolerance -- m/z tolerance for dynamic exclusion window

  • rt_tol

    RT tolerance -- RT tolerance for dynamic exclusion window

  • min_ms1_intensity

    the minimum intensity to fragment a precursor ion

  • roi_params

    an instance of vimms.Roi.RoiBuilderParams that describes how to build ROIs in real time based on incoming scans.

  • min_roi_length_for_fragmentation

    how long a ROI should be before it can be fragmented.

  • ms1_shift

    advanced parameter -- best to leave it.

  • advanced_params

    an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

  • exclusion_method

    an instance of vimms.Exclusion.TopNExclusion or its subclasses, used to describe how to perform dynamic exclusion so that precursors that have been fragmented are not fragmented again.

  • exclusion_t_0

    parameter for WeightedDEW exclusion (refer to paper for details).

Source code in vimms/Controller/roi.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def __init__(
    self,
    ionisation_mode,
    isolation_width,
    N,
    mz_tol,
    rt_tol,
    min_ms1_intensity,
    roi_params,
    min_roi_length_for_fragmentation=0,
    ms1_shift=0,
    advanced_params=None,
    exclusion_method=ROI_EXCLUSION_DEW,
    exclusion_t_0=None,
):
    """
    Initialise the Top-N SmartROI controller.

    Args:
        ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
        isolation_width: isolation width in Dalton
        N: the number of highest-score precursor ions to fragment
        mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
        rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
        min_ms1_intensity: the minimum intensity to fragment a precursor ion
        roi_params: an instance of [vimms.Roi.RoiBuilderParams][] that describes
                    how to build ROIs in real time based on incoming scans.
        min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented.
        ms1_shift: advanced parameter -- best to leave it.
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
        exclusion_method: an instance of [vimms.Exclusion.TopNExclusion][] or its subclasses,
                          used to describe how to perform dynamic exclusion so that precursors
                          that have been fragmented are not fragmented again.
        exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).
    """
    super().__init__(
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        min_roi_length_for_fragmentation=min_roi_length_for_fragmentation,
        ms1_shift=ms1_shift,
        advanced_params=advanced_params,
        exclusion_method=exclusion_method,
        exclusion_t_0=exclusion_t_0,
    )

TopN_SmartRoiController

Bases: RoiController

A ROI-based controller that implements the Top-N selection with SmartROI rules. This is used in the paper 'Rapid Development ...'

Source code in vimms/Controller/roi.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
class TopN_SmartRoiController(RoiController):
    """
    A ROI-based controller that implements the Top-N selection with SmartROI rules.
    This is used in the paper 'Rapid Development ...'
    """

    def __init__(
        self,
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        smartroi_params,
        min_roi_length_for_fragmentation=0,
        ms1_shift=0,
        advanced_params=None,
        exclusion_method=ROI_EXCLUSION_DEW,
        exclusion_t_0=None,
    ):
        """
        Initialise the Top-N SmartROI controller.

        Args:
            ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
            isolation_width: isolation width in Dalton
            N: the number of highest-score precursor ions to fragment
            mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
            rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
            min_ms1_intensity: the minimum intensity to fragment a precursor ion
            roi_params: an instance of [vimms.Roi.RoiBuilderParams][] that describes
                        how to build ROIs in real time based on incoming scans.
            smartroi_params: an instance of [vimms.Roi.SmartRoiParams][]. If provided, then
                             the SmartROI rules (as described in the paper) will be used to select
                             which ROI to fragment. Otherwise set to None to use standard ROIs.
            min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented.
            ms1_shift: advanced parameter -- best to leave it.
            advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                             advanced parameters to control the mass spec. If left to None,
                             default values will be used.
            exclusion_method: an instance of [vimms.Exclusion.TopNExclusion][] or its subclasses,
                              used to describe how to perform dynamic exclusion so that precursors
                              that have been fragmented are not fragmented again.
            exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).
        """
        super().__init__(
            ionisation_mode,
            isolation_width,
            N,
            mz_tol,
            rt_tol,
            min_ms1_intensity,
            roi_params,
            smartroi_params=smartroi_params,
            min_roi_length_for_fragmentation=min_roi_length_for_fragmentation,
            ms1_shift=ms1_shift,
            advanced_params=advanced_params,
            exclusion_method=exclusion_method,
            exclusion_t_0=exclusion_t_0,
        )

    def _get_dda_scores(self):
        return self._log_roi_intensities() * self._min_intensity_filter() * self._smartroi_filter()

    def _get_scores(self):
        initial_scores = self._get_dda_scores()
        scores = self._get_top_N_scores(initial_scores)
        return scores

__init__(ionisation_mode, isolation_width, N, mz_tol, rt_tol, min_ms1_intensity, roi_params, smartroi_params, min_roi_length_for_fragmentation=0, ms1_shift=0, advanced_params=None, exclusion_method=ROI_EXCLUSION_DEW, exclusion_t_0=None)

Initialise the Top-N SmartROI controller.

Parameters:
  • ionisation_mode

    ionisation mode, either POSITIVE or NEGATIVE

  • isolation_width

    isolation width in Dalton

  • N

    the number of highest-score precursor ions to fragment

  • mz_tol

    m/z tolerance -- m/z tolerance for dynamic exclusion window

  • rt_tol

    RT tolerance -- RT tolerance for dynamic exclusion window

  • min_ms1_intensity

    the minimum intensity to fragment a precursor ion

  • roi_params

    an instance of vimms.Roi.RoiBuilderParams that describes how to build ROIs in real time based on incoming scans.

  • smartroi_params

    an instance of vimms.Roi.SmartRoiParams. If provided, then the SmartROI rules (as described in the paper) will be used to select which ROI to fragment. Otherwise set to None to use standard ROIs.

  • min_roi_length_for_fragmentation

    how long a ROI should be before it can be fragmented.

  • ms1_shift

    advanced parameter -- best to leave it.

  • advanced_params

    an vimms.Controller.base.AdvancedParams object that contains advanced parameters to control the mass spec. If left to None, default values will be used.

  • exclusion_method

    an instance of vimms.Exclusion.TopNExclusion or its subclasses, used to describe how to perform dynamic exclusion so that precursors that have been fragmented are not fragmented again.

  • exclusion_t_0

    parameter for WeightedDEW exclusion (refer to paper for details).

Source code in vimms/Controller/roi.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def __init__(
    self,
    ionisation_mode,
    isolation_width,
    N,
    mz_tol,
    rt_tol,
    min_ms1_intensity,
    roi_params,
    smartroi_params,
    min_roi_length_for_fragmentation=0,
    ms1_shift=0,
    advanced_params=None,
    exclusion_method=ROI_EXCLUSION_DEW,
    exclusion_t_0=None,
):
    """
    Initialise the Top-N SmartROI controller.

    Args:
        ionisation_mode: ionisation mode, either POSITIVE or NEGATIVE
        isolation_width: isolation width in Dalton
        N: the number of highest-score precursor ions to fragment
        mz_tol: m/z tolerance -- m/z tolerance for dynamic exclusion window
        rt_tol: RT tolerance -- RT tolerance for dynamic exclusion window
        min_ms1_intensity: the minimum intensity to fragment a precursor ion
        roi_params: an instance of [vimms.Roi.RoiBuilderParams][] that describes
                    how to build ROIs in real time based on incoming scans.
        smartroi_params: an instance of [vimms.Roi.SmartRoiParams][]. If provided, then
                         the SmartROI rules (as described in the paper) will be used to select
                         which ROI to fragment. Otherwise set to None to use standard ROIs.
        min_roi_length_for_fragmentation: how long a ROI should be before it can be fragmented.
        ms1_shift: advanced parameter -- best to leave it.
        advanced_params: an [vimms.Controller.base.AdvancedParams][] object that contains
                         advanced parameters to control the mass spec. If left to None,
                         default values will be used.
        exclusion_method: an instance of [vimms.Exclusion.TopNExclusion][] or its subclasses,
                          used to describe how to perform dynamic exclusion so that precursors
                          that have been fragmented are not fragmented again.
        exclusion_t_0: parameter for WeightedDEW exclusion (refer to paper for details).
    """
    super().__init__(
        ionisation_mode,
        isolation_width,
        N,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        roi_params,
        smartroi_params=smartroi_params,
        min_roi_length_for_fragmentation=min_roi_length_for_fragmentation,
        ms1_shift=ms1_shift,
        advanced_params=advanced_params,
        exclusion_method=exclusion_method,
        exclusion_t_0=exclusion_t_0,
    )

WeightedDEWController

Bases: TopNController

A variant of the Top-N controller, but it uses a linear weight for dynamic exclusion window rather than a True/False indicator on whether a certain precursor ion is excluded or not. For more details, refer to our paper.

Source code in vimms/Controller/topN.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
class WeightedDEWController(TopNController):
    """
    A variant of the Top-N controller, but it uses a linear weight
     for dynamic exclusion window rather than a True/False indicator on whether
     a certain precursor ion is excluded or not. For more details, refer to our paper.
    """

    def __init__(
        self,
        ionisation_mode,
        N,
        isolation_width,
        mz_tol,
        rt_tol,
        min_ms1_intensity,
        ms1_shift=0,
        exclusion_t_0=15,
        log_intensity=False,
        advanced_params=None,
    ):
        super().__init__(
            ionisation_mode,
            N,
            isolation_width,
            mz_tol,
            rt_tol,
            min_ms1_intensity,
            ms1_shift=ms1_shift,
            advanced_params=advanced_params,
        )
        self.log_intensity = log_intensity
        self.exclusion = WeightedDEWExclusion(mz_tol, rt_tol, exclusion_t_0)

    def _process_scan(self, scan):
        # if there's a previous ms1 scan to process
        new_tasks = []
        fragmented_count = 0
        if self.scan_to_process is not None:
            mzs = self.scan_to_process.mzs
            intensities = self.scan_to_process.intensities
            rt = self.scan_to_process.rt

            if not self.log_intensity:
                mzi = [
                    ScanItem(mz, intensities[i])
                    for i, mz in enumerate(mzs)
                    if intensities[i] >= self.min_ms1_intensity
                ]
            else:
                # take log of intensities for peak scoring
                mzi = [
                    ScanItem(mz, np.log(intensities[i]))
                    for i, mz in enumerate(mzs)
                    if intensities[i] >= self.min_ms1_intensity
                ]

            for si in mzi:
                is_exc, weight = self.exclusion.is_excluded(si.mz, rt)
                si.weight = weight

            mzi.sort(reverse=True)

            done_ms1 = False
            ms2_tasks = []
            for i in range(len(mzi)):
                # mz = mzi[i].mz
                # intensity = mzi[i].intensity
                # stopping criteria is after we've fragmented N ions or we
                # found ion < min_intensity
                if fragmented_count >= self.N:
                    break

                mz = mzi[i].mz
                if not self.log_intensity:
                    intensity = mzi[i].intensity
                else:
                    intensity = np.exp(mzi[i].intensity)

                # if 138 <= mz <= 138.5:
                #     print(mz,intensity,mzi[i].weight)

                if mzi[i].weight == 0.0:
                    logger.debug(
                        "Time %f no ions left reached at %f, %d"
                        % (rt, intensity, fragmented_count)
                    )
                    break

                # create a new ms2 scan parameter to be sent to the mass spec
                precursor_scan_id = self.scan_to_process.scan_id
                dda_scan_params = self.get_ms2_scan_params(
                    mz,
                    intensity,
                    precursor_scan_id,
                    self.isolation_width,
                    self.mz_tol,
                    self.rt_tol,
                )
                new_tasks.append(dda_scan_params)
                ms2_tasks.append(dda_scan_params)
                fragmented_count += 1
                self.current_task_id += 1

                # add an ms1 here
                if fragmented_count == self.N - self.ms1_shift:
                    ms1_scan_params = self.get_ms1_scan_params()
                    self.current_task_id += 1
                    self.next_processed_scan_id = self.current_task_id
                    new_tasks.append(ms1_scan_params)
                    done_ms1 = True

            # if no ms1 has been added, then add at the end
            if not done_ms1:
                # if fragmented_count < self.N - self.ms1_shift:
                ms1_scan_params = self.get_ms1_scan_params()
                self.current_task_id += 1
                self.next_processed_scan_id = self.current_task_id
                new_tasks.append(ms1_scan_params)

            # create new exclusion items based on the scheduled ms2 tasks
            self.exclusion.update(self.scan_to_process, ms2_tasks)

            # set this ms1 scan as has been processed
            self.scan_to_process = None
        return new_tasks

WeightedDEWExclusion

Bases: TopNExclusion

A class that perform weighted dynamic exclusion for Top-N. This is further described in our paper 'Rapid Development ...'

Source code in vimms/Exclusion.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
class WeightedDEWExclusion(TopNExclusion):
    """
    A class that perform weighted dynamic exclusion for Top-N.
    This is further described in our paper 'Rapid Development ...'
    """

    def __init__(self, mz_tol, rt_tol, exclusion_t_0):
        """
        Initialises a weighted dynamic exclusion object
        Args:
            rt_tol: the RT tolerance (in seconds)
            exclusion_t_0: WeightedDEW parameter
        """
        super().__init__(mz_tol, rt_tol)
        self.exclusion_t_0 = exclusion_t_0
        if self.exclusion_t_0 > self.rt_tol:
            raise ValueError("exclusion_t_0 must be lte rt_tol")

    def is_excluded(self, mz, rt):
        boxes = self.dynamic_exclusion.check_point(mz, rt)
        if len(boxes) > 0:
            # compute weights for all the boxes that contain this (mz, rt)
            weights = []
            for b in boxes:
                _, w = compute_weight(rt, b.frag_at, self.rt_tol, self.exclusion_t_0)
                weights.append(w)

            # use the min weight -- seems to work well
            w = min(weights)
            flag = False if w == 1.0 else True
            return flag, w

        else:
            return False, 1.0

__init__(mz_tol, rt_tol, exclusion_t_0)

Initialises a weighted dynamic exclusion object Args: rt_tol: the RT tolerance (in seconds) exclusion_t_0: WeightedDEW parameter

Source code in vimms/Exclusion.py
383
384
385
386
387
388
389
390
391
392
393
def __init__(self, mz_tol, rt_tol, exclusion_t_0):
    """
    Initialises a weighted dynamic exclusion object
    Args:
        rt_tol: the RT tolerance (in seconds)
        exclusion_t_0: WeightedDEW parameter
    """
    super().__init__(mz_tol, rt_tol)
    self.exclusion_t_0 = exclusion_t_0
    if self.exclusion_t_0 > self.rt_tol:
        raise ValueError("exclusion_t_0 must be lte rt_tol")

WeightedDEWFilter

Bases: ScoreFilter

A class that implements weighted dynamic exclusion filter

Source code in vimms/Exclusion.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
class WeightedDEWFilter(ScoreFilter):
    """
    A class that implements weighted dynamic exclusion filter
    """

    def __init__(self, exclusion):
        """
        Initialises a weighted dynamic exclusion filter

        Args:
            exclusion: a [vimms.Exclusion.ExclusionItem][] object
        """
        self.exclusion = exclusion

    def filter(self, current_rt, rois):
        """
        Check whether ROIs are excluded or not based on weighted dynamic exclusion filter
        Args:
            current_rt: the current RT value
            rois: a list of [vimms.Roi.Roi][] objects.

        Returns: a numpy array of weights for each ROI.

        """
        weights = []
        for roi in rois:
            last_mz, last_rt, last_intensity = roi.get_last_datum()
            is_exc, weight = self.exclusion.is_excluded(last_mz, last_rt)
            weights.append(weight)
        return np.array(weights)

__init__(exclusion)

Initialises a weighted dynamic exclusion filter

Parameters:
Source code in vimms/Exclusion.py
522
523
524
525
526
527
528
529
def __init__(self, exclusion):
    """
    Initialises a weighted dynamic exclusion filter

    Args:
        exclusion: a [vimms.Exclusion.ExclusionItem][] object
    """
    self.exclusion = exclusion

filter(current_rt, rois)

Check whether ROIs are excluded or not based on weighted dynamic exclusion filter Args: current_rt: the current RT value rois: a list of vimms.Roi.Roi objects.

Returns: a numpy array of weights for each ROI.

Source code in vimms/Exclusion.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
def filter(self, current_rt, rois):
    """
    Check whether ROIs are excluded or not based on weighted dynamic exclusion filter
    Args:
        current_rt: the current RT value
        rois: a list of [vimms.Roi.Roi][] objects.

    Returns: a numpy array of weights for each ROI.

    """
    weights = []
    for roi in rois:
        last_mz, last_rt, last_intensity = roi.get_last_datum()
        is_exc, weight = self.exclusion.is_excluded(last_mz, last_rt)
        weights.append(weight)
    return np.array(weights)

WrapperController

Bases: Controller

Template for controller which wraps behaviour of at least one other controller.

Source code in vimms/Controller/base.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
class WrapperController(Controller):
    """
    Template for controller which wraps behaviour of at least one other
    controller.
    """

    def __init__(self):
        self.__dict__.update(self.controller.__dict__)

    def get_ms1_scan_params(self, metadata=None):
        val = self.controller.get_ms1_scan_params(metadata=None)
        self.__dict__.update(self.controller.__dict__)
        return val

    def get_ms2_scan_params(
        self, mz, intensity, precursor_scan_id, isolation_width, mz_tol, rt_tol, metadata=None
    ):

        val = self.controller.get_ms2_scan_params(
            mz, intensity, precursor_scan_id, isolation_width, mz_tol, rt_tol, metadata=None
        )
        self.__dict__.update(self.controller.__dict__)
        return val

    def get_initial_tasks(self):
        val = self.controller.get_initial_tasks()
        self.__dict__.update(self.controller.__dict__)
        return val

    def get_initial_scan_params(self):
        val = self.controller.get_initial_scan_params()
        self.__dict__.update(self.controller.__dict__)
        return val

    def set_environment(self, env):
        val = self.controller.set_environment(env)
        self.__dict__.update(self.controller.__dict__)
        return val

    def handle_scan(self, scan, current_size, pending_size):
        val = self.controller.handle_scan(scan, current_size, pending_size)
        self.__dict__.update(self.controller.__dict__)
        return val

    def update_state_after_scan(self, last_scan):
        self.controller.update_state_after_scan(last_scan)
        self.__dict__.update(self.controller.__dict__)

    def _process_scan(self, scan):
        val = self.controller._process_scan(scan)
        self.__dict__.update(self.controller.__dict__)
        return val

    def dump_scans(self, output_method):
        self.controller.dump_scans(output_method)
        self.__dict__.update(self.controller.__dict__)

    def after_injection_cleanup(self):
        self.controller.after_injection_cleanup()
        self.__dict__.update(self.controller.__dict__)

create_if_not_exist(out_dir)

Creates a directory if it doesn't already exist Args: out_dir: the directory to create, if it doesn't exist

Returns: None.

Source code in vimms/Common.py
428
429
430
431
432
433
434
435
436
437
438
439
def create_if_not_exist(out_dir):
    """
    Creates a directory if it doesn't already exist
    Args:
        out_dir: the directory to create, if it doesn't exist

    Returns: None.

    """
    if not pathlib.Path(out_dir).exists():
        logger.info("Created %s" % out_dir)
        pathlib.Path(out_dir).mkdir(parents=True, exist_ok=True)

create_targets_from_toxid(toxid_file_name, file_rt_units='minutes', mz_delta=10, rt_delta=60.0, polarity_filter=['+'], adducts_to_use=['[M+H]+', '[M+K]+', '[M+Na]+'])

Note: mz_delta is in ppm

Source code in vimms/Controller/targeted.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def create_targets_from_toxid(
    toxid_file_name,
    file_rt_units="minutes",
    mz_delta=10,
    rt_delta=60.0,
    polarity_filter=["+"],
    adducts_to_use=["[M+H]+", "[M+K]+", "[M+Na]+"],
):
    """
    Note: mz_delta is in ppm
    """
    target_list = []

    with open(str(toxid_file_name), "r") as f:
        reader = csv.reader(f)
        line = [None]
        while len(line) == 0 or not line[0] == "Index":
            line = next(reader)
        # we will now be in the data
        at = AdductTransformer()

        for line in reader:
            if len(line) == 0 or line[0] == "-":  # empty line, or undetected compound
                continue
            name = line[1]
            formula = line[2]
            polarity = line[3]
            if polarity not in polarity_filter:
                continue
            expected_rt = float(line[5])
            if file_rt_units == "minutes":
                expected_rt *= 60.0
            for val in line[8:]:
                assert val == "-" or val == ""
            metadata = {
                "name": name,
                "formula": formula,
                "polarity": polarity,
                "expected_rt": expected_rt,
            }

            for adduct in adducts_to_use:
                theoretical_mz = at.mass2ion(Formula(formula).isotope.mass, adduct)
                min_mz = theoretical_mz - theoretical_mz * mz_delta / 1e6
                max_mz = theoretical_mz + theoretical_mz * mz_delta / 1e6
                min_rt = expected_rt - rt_delta
                max_rt = expected_rt + rt_delta
                new_target = Target(
                    theoretical_mz,
                    min_mz,
                    max_mz,
                    min_rt,
                    max_rt,
                    name=name,
                    metadata=metadata,
                    adduct=adduct,
                )
                target_list.append(new_target)

    return target_list

get_dda_scan_param(mz, intensity, precursor_scan_id, isolation_width, mz_tol, rt_tol, agc_target=DEFAULT_MS2_AGC_TARGET, max_it=DEFAULT_MS2_MAXIT, collision_energy=DEFAULT_MS2_COLLISION_ENERGY, source_cid_energy=DEFAULT_SOURCE_CID_ENERGY, orbitrap_resolution=DEFAULT_MS2_ORBITRAP_RESOLUTION, mass_analyser=DEFAULT_MS2_MASS_ANALYSER, activation_type=DEFAULT_MS1_ACTIVATION_TYPE, isolation_mode=DEFAULT_MS2_ISOLATION_MODE, polarity=POSITIVE, metadata=None, scan_id=None)

Generate the default MS2 scan parameters.

Parameters:
  • mz

    m/z of precursor peak to fragment

  • intensity

    intensity of precursor peak to fragment

  • precursor_scan_id

    scan ID of the MS1 scan containing the precursor peak

  • isolation_width

    isolation width, in Dalton

  • mz_tol

    m/z tolerance for dynamic exclusion # FIXME: this shouldn't be here

  • rt_tol

    RT tolerance for dynamic exclusion # FIXME: this shouldn't be here

  • agc_target

    AGC (automatic gain control) target

  • max_it

    maximum time to collect ion

  • collision_energy

    the collision energy to use

  • source_cid_energy

    source CID energy

  • orbitrap_resolution

    resolution of the mass-spec (Orbitrap) instrument

  • mass_analyser

    which mass analyser to use

  • activation_type

    activation type, either HCD or CID

  • isolation_mode

    isolation mode, either None, or Quadrupole or IonTrap

  • polarity

    the polarity value, either POSITIVE or NEGATIVE

  • metadata

    additional metadata to include in this scan

  • scan_id

    the scan ID, if specified

Returns: the parameters of the MS2 scan to create

Source code in vimms/Common.py
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
def get_dda_scan_param(
    mz,
    intensity,
    precursor_scan_id,
    isolation_width,
    mz_tol,
    rt_tol,
    agc_target=DEFAULT_MS2_AGC_TARGET,
    max_it=DEFAULT_MS2_MAXIT,
    collision_energy=DEFAULT_MS2_COLLISION_ENERGY,
    source_cid_energy=DEFAULT_SOURCE_CID_ENERGY,
    orbitrap_resolution=DEFAULT_MS2_ORBITRAP_RESOLUTION,
    mass_analyser=DEFAULT_MS2_MASS_ANALYSER,
    activation_type=DEFAULT_MS1_ACTIVATION_TYPE,
    isolation_mode=DEFAULT_MS2_ISOLATION_MODE,
    polarity=POSITIVE,
    metadata=None,
    scan_id=None,
):
    """
    Generate the default MS2 scan parameters.

    Args:
        mz: m/z of precursor peak to fragment
        intensity: intensity of precursor peak to fragment
        precursor_scan_id: scan ID of the MS1 scan containing the precursor peak
        isolation_width: isolation width, in Dalton
        mz_tol: m/z tolerance for dynamic exclusion # FIXME: this shouldn't be here
        rt_tol: RT tolerance for dynamic exclusion # FIXME: this shouldn't be here
        agc_target: AGC (automatic gain control) target
        max_it: maximum time to collect ion
        collision_energy: the collision energy to use
        source_cid_energy: source CID energy
        orbitrap_resolution: resolution of the mass-spec (Orbitrap) instrument
        mass_analyser: which mass analyser to use
        activation_type: activation type, either HCD or CID
        isolation_mode: isolation mode, either None, or Quadrupole or IonTrap
        polarity: the polarity value, either POSITIVE or NEGATIVE
        metadata: additional metadata to include in this scan
        scan_id: the scan ID, if specified

    Returns: the parameters of the MS2 scan to create

    """

    dda_scan_params = ScanParameters()
    dda_scan_params.set(ScanParameters.MS_LEVEL, 2)

    assert isinstance(mz, list) == isinstance(intensity, list)

    # create precursor object, assume it's all singly charged
    precursor_charge = +1 if (polarity == POSITIVE) else -1
    if isinstance(mz, list):
        precursor_list = []
        for i, m in enumerate(mz):
            precursor_list.append(
                Precursor(
                    precursor_mz=m,
                    precursor_intensity=intensity[i],
                    precursor_charge=precursor_charge,
                    precursor_scan_id=precursor_scan_id,
                )
            )
        dda_scan_params.set(ScanParameters.PRECURSOR_MZ, precursor_list)

        if isinstance(isolation_width, list):
            assert len(isolation_width) == len(precursor_list)
        else:
            isolation_width = [isolation_width for m in mz]
        dda_scan_params.set(ScanParameters.ISOLATION_WIDTH, isolation_width)

    else:
        precursor = Precursor(
            precursor_mz=mz,
            precursor_intensity=intensity,
            precursor_charge=precursor_charge,
            precursor_scan_id=precursor_scan_id,
        )
        precursor_list = [precursor]
        dda_scan_params.set(ScanParameters.PRECURSOR_MZ, precursor_list)

        # set the full-width isolation width, in Da
        # if mz is not a list, neither should isolation_width be
        assert not isinstance(isolation_width, list)
        isolation_width = [isolation_width]
        dda_scan_params.set(ScanParameters.ISOLATION_WIDTH, isolation_width)

    # define dynamic exclusion parameters
    dda_scan_params.set(ScanParameters.DYNAMIC_EXCLUSION_MZ_TOL, mz_tol)
    dda_scan_params.set(ScanParameters.DYNAMIC_EXCLUSION_RT_TOL, rt_tol)

    # define other fragmentation parameters
    dda_scan_params.set(ScanParameters.COLLISION_ENERGY, collision_energy)
    dda_scan_params.set(ScanParameters.ORBITRAP_RESOLUTION, orbitrap_resolution)
    dda_scan_params.set(ScanParameters.ACTIVATION_TYPE, activation_type)
    dda_scan_params.set(ScanParameters.MASS_ANALYSER, mass_analyser)
    dda_scan_params.set(ScanParameters.ISOLATION_MODE, isolation_mode)
    dda_scan_params.set(ScanParameters.AGC_TARGET, agc_target)
    dda_scan_params.set(ScanParameters.MAX_IT, max_it)
    dda_scan_params.set(ScanParameters.SOURCE_CID_ENERGY, source_cid_energy)
    dda_scan_params.set(ScanParameters.POLARITY, polarity)
    dda_scan_params.set(ScanParameters.FIRST_MASS, DEFAULT_MSN_SCAN_WINDOW[0])
    dda_scan_params.set(ScanParameters.METADATA, metadata)
    dda_scan_params.set(ScanParameters.SCAN_ID, scan_id)

    # dynamically scale the upper mass
    charge = 1
    wiggle_room = 1.1
    max_precursor_mz = max(
        [(p.precursor_mz + isol / 2) for (p, isol) in zip(precursor_list, isolation_width)]
    )
    last_mass = max_precursor_mz * charge * wiggle_room
    dda_scan_params.set(ScanParameters.LAST_MASS, last_mass)
    return dda_scan_params

get_default_scan_params(polarity=POSITIVE, agc_target=DEFAULT_MS1_AGC_TARGET, max_it=DEFAULT_MS1_MAXIT, collision_energy=DEFAULT_MS1_COLLISION_ENERGY, source_cid_energy=DEFAULT_SOURCE_CID_ENERGY, orbitrap_resolution=DEFAULT_MS1_ORBITRAP_RESOLUTION, default_ms1_scan_window=DEFAULT_MS1_SCAN_WINDOW, mass_analyser=DEFAULT_MS1_MASS_ANALYSER, activation_type=DEFAULT_MS1_ACTIVATION_TYPE, isolation_mode=DEFAULT_MS1_ISOLATION_MODE, metadata=None, scan_id=None)

Generate the default MS1 scan parameters.

Parameters:
  • polarity

    the polarity value, either POSITIVE or NEGATIVE

  • agc_target

    AGC (automatic gain control) target

  • max_it

    maximum time to collect ion

  • collision_energy

    the collision energy to use

  • source_cid_energy

    source CID energy

  • orbitrap_resolution

    resolution of the mass-spec (Orbitrap) instrument

  • default_ms1_scan_window

    the default MS1 scan window

  • mass_analyser

    which mass analyser to use

  • activation_type

    activation type, either HCD or CID

  • isolation_mode

    isolation mode, either None, or Quadrupole or IonTrap

  • metadata

    additional metadata to include in this scan

  • scan_id

    the scan ID, if specified

Returns: the parameters of the MS1 scan to create

Source code in vimms/Common.py
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
def get_default_scan_params(
    polarity=POSITIVE,
    agc_target=DEFAULT_MS1_AGC_TARGET,
    max_it=DEFAULT_MS1_MAXIT,
    collision_energy=DEFAULT_MS1_COLLISION_ENERGY,
    source_cid_energy=DEFAULT_SOURCE_CID_ENERGY,
    orbitrap_resolution=DEFAULT_MS1_ORBITRAP_RESOLUTION,
    default_ms1_scan_window=DEFAULT_MS1_SCAN_WINDOW,
    mass_analyser=DEFAULT_MS1_MASS_ANALYSER,
    activation_type=DEFAULT_MS1_ACTIVATION_TYPE,
    isolation_mode=DEFAULT_MS1_ISOLATION_MODE,
    metadata=None,
    scan_id=None,
):
    """
    Generate the default MS1 scan parameters.

    Args:
        polarity: the polarity value, either POSITIVE or NEGATIVE
        agc_target: AGC (automatic gain control) target
        max_it: maximum time to collect ion
        collision_energy: the collision energy to use
        source_cid_energy: source CID energy
        orbitrap_resolution: resolution of the mass-spec (Orbitrap) instrument
        default_ms1_scan_window: the default MS1 scan window
        mass_analyser: which mass analyser to use
        activation_type: activation type, either HCD or CID
        isolation_mode: isolation mode, either None, or Quadrupole or IonTrap
        metadata: additional metadata to include in this scan
        scan_id: the scan ID, if specified

    Returns: the parameters of the MS1 scan to create

    """
    default_scan_params = ScanParameters()
    default_scan_params.set(ScanParameters.MS_LEVEL, 1)
    default_scan_params.set(ScanParameters.ISOLATION_WINDOWS, [[default_ms1_scan_window]])
    default_scan_params.set(ScanParameters.ISOLATION_WIDTH, DEFAULT_ISOLATION_WIDTH)
    default_scan_params.set(ScanParameters.COLLISION_ENERGY, collision_energy)
    default_scan_params.set(ScanParameters.ORBITRAP_RESOLUTION, orbitrap_resolution)
    default_scan_params.set(ScanParameters.ACTIVATION_TYPE, activation_type)
    default_scan_params.set(ScanParameters.MASS_ANALYSER, mass_analyser)
    default_scan_params.set(ScanParameters.ISOLATION_MODE, isolation_mode)
    default_scan_params.set(ScanParameters.AGC_TARGET, agc_target)
    default_scan_params.set(ScanParameters.MAX_IT, max_it)
    default_scan_params.set(ScanParameters.SOURCE_CID_ENERGY, source_cid_energy)
    default_scan_params.set(ScanParameters.POLARITY, polarity)
    default_scan_params.set(ScanParameters.FIRST_MASS, default_ms1_scan_window[0])
    default_scan_params.set(ScanParameters.LAST_MASS, default_ms1_scan_window[1])
    default_scan_params.set(ScanParameters.METADATA, metadata)
    default_scan_params.set(ScanParameters.SCAN_ID, scan_id)
    return default_scan_params