Documentation for Exclusion.py

This file implements various exclusion and filtering criteria in a modular way that can be reused elsewhere, e.g. in controllers.

BoxHolder

A class to allow quick lookup of boxes (e.g. exclusion items, targets, etc). Creates an interval tree on mz as this is likely to narrow things down quicker. Also has a method for returning an rt interval tree for a particular mz and an mz interval tree for a particular RT.

Source code in vimms/Exclusion.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class BoxHolder:
    """
    A class to allow quick lookup of boxes (e.g. exclusion items,
    targets, etc). Creates an interval tree on mz as this is likely to
    narrow things down quicker. Also has a method for returning an rt
    interval tree for a particular mz and an mz interval tree for
    a particular RT.
    """

    def __init__(self):
        """
        Initialise a BoxHolder object
        """
        self.boxes_mz = IntervalTree()
        self.boxes_rt = IntervalTree()

    def __iter__(self):
        return (inv.data for inv in self.boxes_rt.items())

    def add_box(self, box):
        """
        Add a box to the IntervalTree

        Args:
            box: the box to add

        Returns: None

        """
        mz_from = box.from_mz
        mz_to = box.to_mz
        rt_from = box.from_rt
        rt_to = box.to_rt
        self.boxes_mz.addi(mz_from, mz_to, box)
        self.boxes_rt.addi(rt_from, rt_to, box)

    def check_point(self, mz, rt):
        """
        Find the boxes that match this mz and rt value

        Args:
            mz: the m/z to check
            rt: the RT to check

        Returns: the list of hits (boxes that contain this point)

        """
        regions = self.boxes_mz.at(mz)
        hits = set()
        for r in regions:
            if r.data.rt_match(rt):
                hits.add(r.data)
        return hits

    # FIXME: this produces a different result from check_point, do not use yet
    # def check_point_2(self, mz, rt):
    #     """
    #     An alternative method that searches both trees
    #     Might be faster if there are lots of rt ranges that
    #     can map to a particular mz value
    #     """
    #     mz_regions = self.boxes_mz.at(mz)
    #     rt_regions = self.boxes_rt.at(rt)
    #     inter = mz_regions.intersection(rt_regions)
    #     return [r.data for r in inter]

    def is_in_box(self, mz, rt):
        """
        Check if this mz and rt is in *any* box

        Args:
            mz: the m/z to check
            rt: the RT to check

        Returns: True if it's a match, False otherwise.

        """
        hits = self.check_point(mz, rt)
        if len(hits) > 0:
            return True
        else:
            return False

    def is_in_box_mz(self, mz):
        """
        Check if an mz value is in any box

        Args:
            mz: the m/z to check

        Returns: True if it's a match, False otherwise.

        """
        regions = self.boxes_mz.at(mz)
        if len(regions) > 0:
            return True
        else:
            return False

    def is_in_box_rt(self, rt):
        """
        Check if an RT value is in any box

        Args:
            rt: the m/z to check

        Returns: True if it's a match, False otherwise.

        """
        regions = self.boxes_rt.at(rt)
        if len(regions) > 0:
            return True
        else:
            return False

    def get_subset_rt(self, rt):
        """
        Create an interval tree based upon mz for all boxes active at rt

        Args:
            rt: the RT to check

        Returns: a new BoxHolder object containing the subset of boxes at RT

        """
        regions = self.boxes_rt.at(rt)
        it = BoxHolder()
        for r in regions:
            box = r.data
            it.add_box(box)
        return it

    def get_subset_mz(self, mz):
        """
        Create an interval tree based upon mz for all boxes active at m/z

        Args:
            mz: the m/z value to check

        Returns: a new BoxHolder object containing the subset of boxes at m/z

        """
        regions = self.boxes_mz.at(mz)
        it = BoxHolder()
        for r in regions:
            box = r.data
            it.add_box(box)
        return it

__init__()

Initialise a BoxHolder object

Source code in vimms/Exclusion.py
114
115
116
117
118
119
def __init__(self):
    """
    Initialise a BoxHolder object
    """
    self.boxes_mz = IntervalTree()
    self.boxes_rt = IntervalTree()

add_box(box)

Add a box to the IntervalTree

Parameters:
  • box

    the box to add

Returns: None

Source code in vimms/Exclusion.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def add_box(self, box):
    """
    Add a box to the IntervalTree

    Args:
        box: the box to add

    Returns: None

    """
    mz_from = box.from_mz
    mz_to = box.to_mz
    rt_from = box.from_rt
    rt_to = box.to_rt
    self.boxes_mz.addi(mz_from, mz_to, box)
    self.boxes_rt.addi(rt_from, rt_to, box)

check_point(mz, rt)

Find the boxes that match this mz and rt value

Parameters:
  • mz

    the m/z to check

  • rt

    the RT to check

Returns: the list of hits (boxes that contain this point)

Source code in vimms/Exclusion.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def check_point(self, mz, rt):
    """
    Find the boxes that match this mz and rt value

    Args:
        mz: the m/z to check
        rt: the RT to check

    Returns: the list of hits (boxes that contain this point)

    """
    regions = self.boxes_mz.at(mz)
    hits = set()
    for r in regions:
        if r.data.rt_match(rt):
            hits.add(r.data)
    return hits

get_subset_mz(mz)

Create an interval tree based upon mz for all boxes active at m/z

Parameters:
  • mz

    the m/z value to check

Returns: a new BoxHolder object containing the subset of boxes at m/z

Source code in vimms/Exclusion.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def get_subset_mz(self, mz):
    """
    Create an interval tree based upon mz for all boxes active at m/z

    Args:
        mz: the m/z value to check

    Returns: a new BoxHolder object containing the subset of boxes at m/z

    """
    regions = self.boxes_mz.at(mz)
    it = BoxHolder()
    for r in regions:
        box = r.data
        it.add_box(box)
    return it

get_subset_rt(rt)

Create an interval tree based upon mz for all boxes active at rt

Parameters:
  • rt

    the RT to check

Returns: a new BoxHolder object containing the subset of boxes at RT

Source code in vimms/Exclusion.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def get_subset_rt(self, rt):
    """
    Create an interval tree based upon mz for all boxes active at rt

    Args:
        rt: the RT to check

    Returns: a new BoxHolder object containing the subset of boxes at RT

    """
    regions = self.boxes_rt.at(rt)
    it = BoxHolder()
    for r in regions:
        box = r.data
        it.add_box(box)
    return it

is_in_box(mz, rt)

Check if this mz and rt is in any box

Parameters:
  • mz

    the m/z to check

  • rt

    the RT to check

Returns: True if it's a match, False otherwise.

Source code in vimms/Exclusion.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def is_in_box(self, mz, rt):
    """
    Check if this mz and rt is in *any* box

    Args:
        mz: the m/z to check
        rt: the RT to check

    Returns: True if it's a match, False otherwise.

    """
    hits = self.check_point(mz, rt)
    if len(hits) > 0:
        return True
    else:
        return False

is_in_box_mz(mz)

Check if an mz value is in any box

Parameters:
  • mz

    the m/z to check

Returns: True if it's a match, False otherwise.

Source code in vimms/Exclusion.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def is_in_box_mz(self, mz):
    """
    Check if an mz value is in any box

    Args:
        mz: the m/z to check

    Returns: True if it's a match, False otherwise.

    """
    regions = self.boxes_mz.at(mz)
    if len(regions) > 0:
        return True
    else:
        return False

is_in_box_rt(rt)

Check if an RT value is in any box

Parameters:
  • rt

    the m/z to check

Returns: True if it's a match, False otherwise.

Source code in vimms/Exclusion.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def is_in_box_rt(self, rt):
    """
    Check if an RT value is in any box

    Args:
        rt: the m/z to check

    Returns: True if it's a match, False otherwise.

    """
    regions = self.boxes_rt.at(rt)
    if len(regions) > 0:
        return True
    else:
        return False

DEWFilter

Bases: ScoreFilter

A class that implements dynamic exclusion filter

Source code in vimms/Exclusion.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
class DEWFilter(ScoreFilter):
    """
    A class that implements dynamic exclusion filter
    """

    def __init__(self, rt_tol):
        """
        Initialises a dynamic exclusion filter based on time only
        Args:
            rt_tol: the RT tolerance (in seconds)
        """
        self.rt_tol = rt_tol

    def filter(self, current_rt, rois):
        """
        Check whether intensity values are above or below the threshold
        Args:
            current_rt: the current RT value
            rois: a list of [vimms.Roi.Roi][] objects.

        Returns: an array of indicators for the filter
        """

        last_frag_rts = [roi.last_frag_rt for roi in rois]

        # Handles None values by converting to NaN for which all
        # comparisons return 0
        return np.logical_not(current_rt - np.array(last_frag_rts, dtype=np.double) <= self.rt_tol)

__init__(rt_tol)

Initialises a dynamic exclusion filter based on time only Args: rt_tol: the RT tolerance (in seconds)

Source code in vimms/Exclusion.py
492
493
494
495
496
497
498
def __init__(self, rt_tol):
    """
    Initialises a dynamic exclusion filter based on time only
    Args:
        rt_tol: the RT tolerance (in seconds)
    """
    self.rt_tol = rt_tol

filter(current_rt, rois)

Check whether intensity values are above or below the threshold Args: current_rt: the current RT value rois: a list of vimms.Roi.Roi objects.

Returns: an array of indicators for the filter

Source code in vimms/Exclusion.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
def filter(self, current_rt, rois):
    """
    Check whether intensity values are above or below the threshold
    Args:
        current_rt: the current RT value
        rois: a list of [vimms.Roi.Roi][] objects.

    Returns: an array of indicators for the filter
    """

    last_frag_rts = [roi.last_frag_rt for roi in rois]

    # Handles None values by converting to NaN for which all
    # comparisons return 0
    return np.logical_not(current_rt - np.array(last_frag_rts, dtype=np.double) <= self.rt_tol)

ExclusionItem

A class to store the item to exclude when computing dynamic exclusion window

Source code in vimms/Exclusion.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class ExclusionItem:
    """
    A class to store the item to exclude when computing dynamic
    exclusion window
    """

    def __init__(self, from_mz, to_mz, from_rt, to_rt, frag_at):
        """
        Creates a dynamic exclusion item

        Args:
            from_mz: m/z lower bounding box
            to_mz: m/z upper bounding box
            from_rt: RT lower bounding box
            to_rt: RT upper bounding box
            frag_at: RT when this ExclusionItem is fragmented
        """
        self.from_mz = from_mz
        self.to_mz = to_mz
        self.from_rt = from_rt
        self.to_rt = to_rt
        self.frag_at = frag_at
        self.mz = (self.from_mz + self.to_mz) / 2.0
        self.rt = self.frag_at
        self.counter = 0  # add a counter field

    def peak_in(self, mz, rt):
        """
        Checks that a peak described by its (mz, rt) values lies in this box.
        Args:
            mz: the m/z value to check
            rt: the RT value to check

        Returns: True if it's a match, False otherwise.

        """
        if self.rt_match(rt) and self.mz_match(mz):
            return True
        else:
            return False

    def increment_counter(self):
        self.counter += 1

    def rt_match(self, rt):
        """
        Checks that a certain RT point lies in this box
        Args:
            rt: the RT value to check

        Returns: True if it's a match, False otherwise.

        """
        if rt >= self.from_rt and rt <= self.to_rt:
            return True
        else:
            return False

    def mz_match(self, mz):
        """
        Checks that a certain m/z point lies in this box
        Args:
            mz: the m/z value to check

        Returns: True if it's a match, False otherwise.

        """
        if mz >= self.from_mz and mz <= self.to_mz:
            return True
        else:
            return False

    def __repr__(self):
        return "ExclusionItem mz=(%f, %f) rt=(%f-%f)" % (
            self.from_mz,
            self.to_mz,
            self.from_rt,
            self.to_rt,
        )

    def __lt__(self, other):
        if self.from_mz <= other.from_mz:
            return True
        else:
            return False

__init__(from_mz, to_mz, from_rt, to_rt, frag_at)

Creates a dynamic exclusion item

Parameters:
  • from_mz

    m/z lower bounding box

  • to_mz

    m/z upper bounding box

  • from_rt

    RT lower bounding box

  • to_rt

    RT upper bounding box

  • frag_at

    RT when this ExclusionItem is fragmented

Source code in vimms/Exclusion.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, from_mz, to_mz, from_rt, to_rt, frag_at):
    """
    Creates a dynamic exclusion item

    Args:
        from_mz: m/z lower bounding box
        to_mz: m/z upper bounding box
        from_rt: RT lower bounding box
        to_rt: RT upper bounding box
        frag_at: RT when this ExclusionItem is fragmented
    """
    self.from_mz = from_mz
    self.to_mz = to_mz
    self.from_rt = from_rt
    self.to_rt = to_rt
    self.frag_at = frag_at
    self.mz = (self.from_mz + self.to_mz) / 2.0
    self.rt = self.frag_at
    self.counter = 0  # add a counter field

mz_match(mz)

Checks that a certain m/z point lies in this box Args: mz: the m/z value to check

Returns: True if it's a match, False otherwise.

Source code in vimms/Exclusion.py
76
77
78
79
80
81
82
83
84
85
86
87
88
def mz_match(self, mz):
    """
    Checks that a certain m/z point lies in this box
    Args:
        mz: the m/z value to check

    Returns: True if it's a match, False otherwise.

    """
    if mz >= self.from_mz and mz <= self.to_mz:
        return True
    else:
        return False

peak_in(mz, rt)

Checks that a peak described by its (mz, rt) values lies in this box. Args: mz: the m/z value to check rt: the RT value to check

Returns: True if it's a match, False otherwise.

Source code in vimms/Exclusion.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def peak_in(self, mz, rt):
    """
    Checks that a peak described by its (mz, rt) values lies in this box.
    Args:
        mz: the m/z value to check
        rt: the RT value to check

    Returns: True if it's a match, False otherwise.

    """
    if self.rt_match(rt) and self.mz_match(mz):
        return True
    else:
        return False

rt_match(rt)

Checks that a certain RT point lies in this box Args: rt: the RT value to check

Returns: True if it's a match, False otherwise.

Source code in vimms/Exclusion.py
62
63
64
65
66
67
68
69
70
71
72
73
74
def rt_match(self, rt):
    """
    Checks that a certain RT point lies in this box
    Args:
        rt: the RT value to check

    Returns: True if it's a match, False otherwise.

    """
    if rt >= self.from_rt and rt <= self.to_rt:
        return True
    else:
        return False

LengthFilter

Bases: ScoreFilter

A class that implements a check on minimum length of ROI for fragmentation

Source code in vimms/Exclusion.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
class LengthFilter(ScoreFilter):
    """
    A class that implements a check on minimum length of ROI for fragmentation
    """

    def __init__(self, min_roi_length_for_fragmentation):
        """
        Initialise a length filter

        Args:
            min_roi_length_for_fragmentation: the minimum length of ROI for fragmentation
        """
        self.min_roi_length_for_fragmentation = min_roi_length_for_fragmentation

    def filter(self, roi_lengths):
        """
        Check that ROI lengths are above the threshold
        Args:
            roi_lengths: a numpy array of ROI lengths

        Returns: an array of indicator whether the lengths are above threshold

        """
        return roi_lengths >= self.min_roi_length_for_fragmentation

__init__(min_roi_length_for_fragmentation)

Initialise a length filter

Parameters:
  • min_roi_length_for_fragmentation

    the minimum length of ROI for fragmentation

Source code in vimms/Exclusion.py
554
555
556
557
558
559
560
561
def __init__(self, min_roi_length_for_fragmentation):
    """
    Initialise a length filter

    Args:
        min_roi_length_for_fragmentation: the minimum length of ROI for fragmentation
    """
    self.min_roi_length_for_fragmentation = min_roi_length_for_fragmentation

filter(roi_lengths)

Check that ROI lengths are above the threshold Args: roi_lengths: a numpy array of ROI lengths

Returns: an array of indicator whether the lengths are above threshold

Source code in vimms/Exclusion.py
563
564
565
566
567
568
569
570
571
572
def filter(self, roi_lengths):
    """
    Check that ROI lengths are above the threshold
    Args:
        roi_lengths: a numpy array of ROI lengths

    Returns: an array of indicator whether the lengths are above threshold

    """
    return roi_lengths >= self.min_roi_length_for_fragmentation

MinIntensityFilter

Bases: ScoreFilter

A class that implements minimum intensity filter

Source code in vimms/Exclusion.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
class MinIntensityFilter(ScoreFilter):
    """
    A class that implements minimum intensity filter
    """

    def __init__(self, min_ms1_intensity):
        """
        Initialises the minimum intensity filter
        Args:
            min_ms1_intensity: the minimum intensity to check
        """
        self.min_ms1_intensity = min_ms1_intensity

    def filter(self, intensities):
        """
        Check whether intensity values are above or below the threshold
        Args:
            intensities: an array of intensity values

        Returns: an array of indicators for the filter

        """
        return np.array(intensities) > self.min_ms1_intensity

__init__(min_ms1_intensity)

Initialises the minimum intensity filter Args: min_ms1_intensity: the minimum intensity to check

Source code in vimms/Exclusion.py
467
468
469
470
471
472
473
def __init__(self, min_ms1_intensity):
    """
    Initialises the minimum intensity filter
    Args:
        min_ms1_intensity: the minimum intensity to check
    """
    self.min_ms1_intensity = min_ms1_intensity

filter(intensities)

Check whether intensity values are above or below the threshold Args: intensities: an array of intensity values

Returns: an array of indicators for the filter

Source code in vimms/Exclusion.py
475
476
477
478
479
480
481
482
483
484
def filter(self, intensities):
    """
    Check whether intensity values are above or below the threshold
    Args:
        intensities: an array of intensity values

    Returns: an array of indicators for the filter

    """
    return np.array(intensities) > self.min_ms1_intensity

ScoreFilter

Bases: ABC

Base class for various filters

Source code in vimms/Exclusion.py
452
453
454
455
456
457
458
459
class ScoreFilter(ABC):
    """
    Base class for various filters
    """

    @abstractmethod
    def filter(self):
        pass

SmartROIFilter

Bases: ScoreFilter

A class that implements SmartROI filtering criteria. For more details, refer to our paper 'Rapid Development ...'

Source code in vimms/Exclusion.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
class SmartROIFilter(ScoreFilter):
    """
    A class that implements SmartROI filtering criteria.
    For more details, refer to our paper 'Rapid Development ...'
    """

    def filter(self, rois):
        """
        Filter ROIs based on SmartROI rules.


        Args:
            rois: a list of [vimms.Roi.Roi] objects. if this is a normal ROI object,
                  always return True for everything otherwise track the status based
                  on the SmartROI rules

        Returns: an array of indicator whether ROI can be fragmented or not.

        """
        can_fragments = np.array([roi.can_fragment for roi in rois])
        return can_fragments

filter(rois)

Filter ROIs based on SmartROI rules.

Parameters:
  • rois

    a list of [vimms.Roi.Roi] objects. if this is a normal ROI object, always return True for everything otherwise track the status based on the SmartROI rules

Returns: an array of indicator whether ROI can be fragmented or not.

Source code in vimms/Exclusion.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
def filter(self, rois):
    """
    Filter ROIs based on SmartROI rules.


    Args:
        rois: a list of [vimms.Roi.Roi] objects. if this is a normal ROI object,
              always return True for everything otherwise track the status based
              on the SmartROI rules

    Returns: an array of indicator whether ROI can be fragmented or not.

    """
    can_fragments = np.array([roi.can_fragment for roi in rois])
    return can_fragments

TopNExclusion

A class that perform standard dynamic exclusion for Top-N. This is based on checked whether an m/z and RT value lies in certain exclusion boxes.

Source code in vimms/Exclusion.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
class TopNExclusion:
    """
    A class that perform standard dynamic exclusion for Top-N.
    This is based on checked whether an m/z and RT value lies in certain exclusion boxes.
    """

    def __init__(
        self, mz_tol, rt_tol, exclude_after_n_times=1, exclude_t0=0, initial_exclusion_list=None
    ):
        """
        Initialise a Top-N dynamic exclusion object

        Args:
            mz_tol:
            rt_tol:
            exclude_after_n_times:
            exclude_t0:
            initial_exclusion_list:
        """
        self.mz_tol = mz_tol
        self.rt_tol = rt_tol
        self.exclude_after_n_times = exclude_after_n_times
        self.exclude_t0 = exclude_t0

        self.exclude_check = BoxHolder()
        self.dynamic_exclusion = BoxHolder()

        # Initialise 'dynamic_exclusion' with its initial value, if provided
        if initial_exclusion_list is not None:
            for initial in initial_exclusion_list:
                self.dynamic_exclusion.add_box(initial)

    def is_excluded(self, mz, rt):
        """
        Checks if a pair of (mz, rt) value is currently excluded by
        dynamic exclusion window

        Args:
            mz: m/z value
            rt: RT value
            mz_tol: m/z tolerance
            rt_tol: rt_tolerance

        Returns: True if excluded (with weight 0.0), False otherwise (weight 1.0).

        """
        # check the main dynamic exclusion list to see if this ion should be excluded
        dew_check = self.dynamic_exclusion.is_in_box(mz, rt)
        if dew_check:
            return True, 0.0

        # if not excluded, then check the initial list to see if we need to increment count
        found = False
        hits = self.exclude_check.check_point(mz, rt)
        if len(hits) > 0:  # if there are initial hits, increment them

            # here we increment all hits that contain this (mz, rt) point
            # and check if any of them has been excluded more times than the threshold
            for box in hits:
                box.increment_counter()
                if box.counter >= self.exclude_after_n_times:
                    found = True

        # if some boxes have hit threshold that were reached, exclude this ion
        if found:
            x = self._get_exclusion_item(mz, rt, self.mz_tol, self.rt_tol)
            self.dynamic_exclusion.add_box(x)
            return True, 0.0

        # finally this ion is not excluded if it is not in either the main or initial lists
        return False, 1.0

    def update(self, current_scan, ms2_tasks):
        """
        For every scheduled MS2 scan, add its precursor m/z for initial exclusion check
        A tolerance of initial_t0 is used

        Args:
            current_scan: the current MS1 scan
            ms2_tasks: scheduled ms2 tasks

        Returns: None

        """
        rt = current_scan.rt
        for task in ms2_tasks:
            for precursor in task.get("precursor_mz"):
                mz = precursor.precursor_mz

                # new way of checking DEW -- with an initial boxholder to check first
                if self.exclude_t0 > 0:
                    x = self._get_exclusion_item(mz, rt, self.mz_tol, self.exclude_t0)
                    self.exclude_check.add_box(x)

                else:  # fallback to the old way by adding directly to the DEW boxholder
                    x = self._get_exclusion_item(mz, rt, self.mz_tol, self.rt_tol)
                    self.dynamic_exclusion.add_box(x)

    def _get_exclusion_item(self, mz, rt, mz_tol, rt_tol):
        """
        Create a new [vimms.Exclusion.ExclusionItem][] object based on the (mz, rt) values
        as well as the tolerances.
        Args:
            mz: the m/z value
            rt: the RT value
            mz_tol: the m/z tolerance (in ppm)
            rt_tol: the RT tolerance (in seconds)

        Returns: a new [vimms.Exclusion.ExclusionItem][] object

        """
        mz_lower = mz * (1 - mz_tol / 1e6)
        mz_upper = mz * (1 + mz_tol / 1e6)
        rt_lower = rt - rt_tol
        # I think this is mostly for topN (iterative) exclusion method
        rt_upper = rt + rt_tol
        x = ExclusionItem(
            from_mz=mz_lower, to_mz=mz_upper, from_rt=rt_lower, to_rt=rt_upper, frag_at=rt
        )
        return x

__init__(mz_tol, rt_tol, exclude_after_n_times=1, exclude_t0=0, initial_exclusion_list=None)

Initialise a Top-N dynamic exclusion object

Parameters:
  • mz_tol
  • rt_tol
  • exclude_after_n_times
  • exclude_t0
  • initial_exclusion_list
Source code in vimms/Exclusion.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def __init__(
    self, mz_tol, rt_tol, exclude_after_n_times=1, exclude_t0=0, initial_exclusion_list=None
):
    """
    Initialise a Top-N dynamic exclusion object

    Args:
        mz_tol:
        rt_tol:
        exclude_after_n_times:
        exclude_t0:
        initial_exclusion_list:
    """
    self.mz_tol = mz_tol
    self.rt_tol = rt_tol
    self.exclude_after_n_times = exclude_after_n_times
    self.exclude_t0 = exclude_t0

    self.exclude_check = BoxHolder()
    self.dynamic_exclusion = BoxHolder()

    # Initialise 'dynamic_exclusion' with its initial value, if provided
    if initial_exclusion_list is not None:
        for initial in initial_exclusion_list:
            self.dynamic_exclusion.add_box(initial)

is_excluded(mz, rt)

Checks if a pair of (mz, rt) value is currently excluded by dynamic exclusion window

Parameters:
  • mz

    m/z value

  • rt

    RT value

  • mz_tol

    m/z tolerance

  • rt_tol

    rt_tolerance

Returns: True if excluded (with weight 0.0), False otherwise (weight 1.0).

Source code in vimms/Exclusion.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def is_excluded(self, mz, rt):
    """
    Checks if a pair of (mz, rt) value is currently excluded by
    dynamic exclusion window

    Args:
        mz: m/z value
        rt: RT value
        mz_tol: m/z tolerance
        rt_tol: rt_tolerance

    Returns: True if excluded (with weight 0.0), False otherwise (weight 1.0).

    """
    # check the main dynamic exclusion list to see if this ion should be excluded
    dew_check = self.dynamic_exclusion.is_in_box(mz, rt)
    if dew_check:
        return True, 0.0

    # if not excluded, then check the initial list to see if we need to increment count
    found = False
    hits = self.exclude_check.check_point(mz, rt)
    if len(hits) > 0:  # if there are initial hits, increment them

        # here we increment all hits that contain this (mz, rt) point
        # and check if any of them has been excluded more times than the threshold
        for box in hits:
            box.increment_counter()
            if box.counter >= self.exclude_after_n_times:
                found = True

    # if some boxes have hit threshold that were reached, exclude this ion
    if found:
        x = self._get_exclusion_item(mz, rt, self.mz_tol, self.rt_tol)
        self.dynamic_exclusion.add_box(x)
        return True, 0.0

    # finally this ion is not excluded if it is not in either the main or initial lists
    return False, 1.0

update(current_scan, ms2_tasks)

For every scheduled MS2 scan, add its precursor m/z for initial exclusion check A tolerance of initial_t0 is used

Parameters:
  • current_scan

    the current MS1 scan

  • ms2_tasks

    scheduled ms2 tasks

Returns: None

Source code in vimms/Exclusion.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def update(self, current_scan, ms2_tasks):
    """
    For every scheduled MS2 scan, add its precursor m/z for initial exclusion check
    A tolerance of initial_t0 is used

    Args:
        current_scan: the current MS1 scan
        ms2_tasks: scheduled ms2 tasks

    Returns: None

    """
    rt = current_scan.rt
    for task in ms2_tasks:
        for precursor in task.get("precursor_mz"):
            mz = precursor.precursor_mz

            # new way of checking DEW -- with an initial boxholder to check first
            if self.exclude_t0 > 0:
                x = self._get_exclusion_item(mz, rt, self.mz_tol, self.exclude_t0)
                self.exclude_check.add_box(x)

            else:  # fallback to the old way by adding directly to the DEW boxholder
                x = self._get_exclusion_item(mz, rt, self.mz_tol, self.rt_tol)
                self.dynamic_exclusion.add_box(x)

WeightedDEWExclusion

Bases: TopNExclusion

A class that perform weighted dynamic exclusion for Top-N. This is further described in our paper 'Rapid Development ...'

Source code in vimms/Exclusion.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
class WeightedDEWExclusion(TopNExclusion):
    """
    A class that perform weighted dynamic exclusion for Top-N.
    This is further described in our paper 'Rapid Development ...'
    """

    def __init__(self, mz_tol, rt_tol, exclusion_t_0):
        """
        Initialises a weighted dynamic exclusion object
        Args:
            rt_tol: the RT tolerance (in seconds)
            exclusion_t_0: WeightedDEW parameter
        """
        super().__init__(mz_tol, rt_tol)
        self.exclusion_t_0 = exclusion_t_0
        if self.exclusion_t_0 > self.rt_tol:
            raise ValueError("exclusion_t_0 must be lte rt_tol")

    def is_excluded(self, mz, rt):
        boxes = self.dynamic_exclusion.check_point(mz, rt)
        if len(boxes) > 0:
            # compute weights for all the boxes that contain this (mz, rt)
            weights = []
            for b in boxes:
                _, w = compute_weight(rt, b.frag_at, self.rt_tol, self.exclusion_t_0)
                weights.append(w)

            # use the min weight -- seems to work well
            w = min(weights)
            flag = False if w == 1.0 else True
            return flag, w

        else:
            return False, 1.0

__init__(mz_tol, rt_tol, exclusion_t_0)

Initialises a weighted dynamic exclusion object Args: rt_tol: the RT tolerance (in seconds) exclusion_t_0: WeightedDEW parameter

Source code in vimms/Exclusion.py
383
384
385
386
387
388
389
390
391
392
393
def __init__(self, mz_tol, rt_tol, exclusion_t_0):
    """
    Initialises a weighted dynamic exclusion object
    Args:
        rt_tol: the RT tolerance (in seconds)
        exclusion_t_0: WeightedDEW parameter
    """
    super().__init__(mz_tol, rt_tol)
    self.exclusion_t_0 = exclusion_t_0
    if self.exclusion_t_0 > self.rt_tol:
        raise ValueError("exclusion_t_0 must be lte rt_tol")

WeightedDEWFilter

Bases: ScoreFilter

A class that implements weighted dynamic exclusion filter

Source code in vimms/Exclusion.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
class WeightedDEWFilter(ScoreFilter):
    """
    A class that implements weighted dynamic exclusion filter
    """

    def __init__(self, exclusion):
        """
        Initialises a weighted dynamic exclusion filter

        Args:
            exclusion: a [vimms.Exclusion.ExclusionItem][] object
        """
        self.exclusion = exclusion

    def filter(self, current_rt, rois):
        """
        Check whether ROIs are excluded or not based on weighted dynamic exclusion filter
        Args:
            current_rt: the current RT value
            rois: a list of [vimms.Roi.Roi][] objects.

        Returns: a numpy array of weights for each ROI.

        """
        weights = []
        for roi in rois:
            last_mz, last_rt, last_intensity = roi.get_last_datum()
            is_exc, weight = self.exclusion.is_excluded(last_mz, last_rt)
            weights.append(weight)
        return np.array(weights)

__init__(exclusion)

Initialises a weighted dynamic exclusion filter

Parameters:
Source code in vimms/Exclusion.py
522
523
524
525
526
527
528
529
def __init__(self, exclusion):
    """
    Initialises a weighted dynamic exclusion filter

    Args:
        exclusion: a [vimms.Exclusion.ExclusionItem][] object
    """
    self.exclusion = exclusion

filter(current_rt, rois)

Check whether ROIs are excluded or not based on weighted dynamic exclusion filter Args: current_rt: the current RT value rois: a list of vimms.Roi.Roi objects.

Returns: a numpy array of weights for each ROI.

Source code in vimms/Exclusion.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
def filter(self, current_rt, rois):
    """
    Check whether ROIs are excluded or not based on weighted dynamic exclusion filter
    Args:
        current_rt: the current RT value
        rois: a list of [vimms.Roi.Roi][] objects.

    Returns: a numpy array of weights for each ROI.

    """
    weights = []
    for roi in rois:
        last_mz, last_rt, last_intensity = roi.get_last_datum()
        is_exc, weight = self.exclusion.is_excluded(last_mz, last_rt)
        weights.append(weight)
    return np.array(weights)

compute_weight(current_rt, frag_at, rt_tol, exclusion_t_0)

Compute the weight for current RT at frag_at given the RT tolerance and other parameters. Args: current_rt: the current RT value frag_at: the retention time when fragmentation last occured rt_tol: RT tolerance (in seconds) exclusion_t_0: weighted DEW parameter

Returns: a new weight

Source code in vimms/Exclusion.py
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def compute_weight(current_rt, frag_at, rt_tol, exclusion_t_0):
    """
    Compute the weight for current RT at frag_at given the RT tolerance and other parameters.
    Args:
        current_rt: the current RT value
        frag_at: the retention time when fragmentation last occured
        rt_tol: RT tolerance (in seconds)
        exclusion_t_0: weighted DEW parameter

    Returns: a new weight

    """
    if frag_at is None:
        # never been fragmented before, always include (weight 1.0)
        return False, 1.0
    elif current_rt >= frag_at + rt_tol:
        # outside the windows of rt_tol, always include (weight > 1.0)
        return False, 1.0
    elif current_rt <= frag_at + exclusion_t_0:
        # fragmented but within exclusion_t_0, always exclude (weight 0.0)
        return True, 0.0
    else:
        # compute weight according to the WeightedDEW scheme
        weight = (current_rt - (exclusion_t_0 + frag_at)) / (rt_tol - exclusion_t_0)
        if weight > 1:
            logger.warning(
                "exclusion weight %f is greater than 1 ("
                "current_rt %f exclusion_t_0 %f frag_at %f "
                "rt_tol %f)" % (weight, current_rt, exclusion_t_0, frag_at, rt_tol)
            )
        # assert weight <= 1, weight
        return True, weight