Skip to content

Documentation

Succinct Multiple Alignment class

SuccinctMultipleAlignment

Examples:

>>> msa = SuccinctMultipleAlignment("tests/align1.fa", ratio_other_vector=.25)
>>> len(msa)
16
>>> msa.get_nb_sequences()
5
>>> msa.get_consensus()
'CGTATCAGCNTACGAT'
>>> msa.get_sequence(0)
'AGTATCAGCATA-GAT'
>>> msa.get_sequence(4)
'CGTTTCAGCATA-GAT'
>>> msa.get_kept_nucleotides(0)
'AC'
>>> msa.get_kept_nucleotides(1)
'G'
>>> msa.get_kept_nucleotides(12)
'-C-'
>>> msa.get_column(0).nt_pos() == {'A': [(0,1)], 'C': [(2, 4)]}
True
>>> msa.get_column(1).nt_pos() == {'G': [(0,4)]}
True
>>> msa.get_column(12).nt_pos() == {'-': [(0,0), (4, 4)], 'C': [(1,3)]}
True
>>> msa.get_column(12).nt_frequency() == {'-': .4, 'C': .6}
True
>>> msa.get_column(9).get_nb_changes()
5
>>> len(msa.get_column(3))
5
>>> msa.get_column(5) == msa[5]
True
>>> msa.get_column(1).get_vector_type()
<class 'pysdsl.pysdsl.SDVector'>
>>> msa.get_column(0).get_vector_type()
<class 'pysdsl.pysdsl.RamanRamanRaoVector63'>
>>> import tempfile
>>> import os
>>> f = tempfile.NamedTemporaryFile(delete=False)
>>> msa.store_to_file(f.name)
>>> del msa
>>> msa = SuccinctMultipleAlignment.load(f.name)
>>> os.unlink(f.name)
>>> len(msa)
16
>>> msa.get_nb_sequences()
5
>>> msa.get_consensus()
'CGTATCAGCNTACGAT'
>>> msa.get_sequence(0)
'AGTATCAGCATA-GAT'
>>> msa.get_sequence(4)
'CGTTTCAGCATA-GAT'
>>> msa.get_kept_nucleotides(0)
'AC'
>>> msa.get_kept_nucleotides(1)
'G'
>>> msa.get_kept_nucleotides(12)
'-C-'
>>> msa.get_column(0).nt_pos() == {'A': [(0,1)], 'C': [(2, 4)]}
True
>>> msa.get_column(1).nt_pos() == {'G': [(0,4)]}
True
>>> msa.get_column(12).nt_pos() == {'-': [(0,0), (4, 4)], 'C': [(1,3)]}
True
>>> msa.get_column(12).nt_frequency() == {'-': .4, 'C': .6}
True
>>> msa.get_column(9).get_nb_changes()
5
>>> len(msa.get_column(3))
5
>>> msa.get_column(5) == msa[5]
True
>>> msa.get_column(1).get_vector_type()
<class 'pysdsl.pysdsl.SDVector'>
>>> msa.get_column(0).get_vector_type()
<class 'pysdsl.pysdsl.RamanRamanRaoVector63'>
Source code in succinct_multiple_alignment.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
class SuccinctMultipleAlignment:
    '''
    Examples:
        >>> msa = SuccinctMultipleAlignment("tests/align1.fa", ratio_other_vector=.25)
        >>> len(msa)
        16
        >>> msa.get_nb_sequences()
        5
        >>> msa.get_consensus()
        'CGTATCAGCNTACGAT'
        >>> msa.get_sequence(0)
        'AGTATCAGCATA-GAT'
        >>> msa.get_sequence(4)
        'CGTTTCAGCATA-GAT'
        >>> msa.get_kept_nucleotides(0)
        'AC'
        >>> msa.get_kept_nucleotides(1)
        'G'
        >>> msa.get_kept_nucleotides(12)
        '-C-'
        >>> msa.get_column(0).nt_pos() == {'A': [(0,1)], 'C': [(2, 4)]}
        True
        >>> msa.get_column(1).nt_pos() == {'G': [(0,4)]}
        True
        >>> msa.get_column(12).nt_pos() == {'-': [(0,0), (4, 4)], 'C': [(1,3)]}
        True
        >>> msa.get_column(12).nt_frequency() == {'-': .4, 'C': .6}
        True
        >>> msa.get_column(9).get_nb_changes()
        5
        >>> len(msa.get_column(3))
        5
        >>> msa.get_column(5) == msa[5]
        True
        >>> msa.get_column(1).get_vector_type()
        <class 'pysdsl.pysdsl.SDVector'>
        >>> msa.get_column(0).get_vector_type()
        <class 'pysdsl.pysdsl.RamanRamanRaoVector63'>
        >>> import tempfile
        >>> import os
        >>> f = tempfile.NamedTemporaryFile(delete=False)
        >>> msa.store_to_file(f.name)
        >>> del msa
        >>> msa = SuccinctMultipleAlignment.load(f.name)
        >>> os.unlink(f.name)
        >>> len(msa)
        16
        >>> msa.get_nb_sequences()
        5
        >>> msa.get_consensus()
        'CGTATCAGCNTACGAT'
        >>> msa.get_sequence(0)
        'AGTATCAGCATA-GAT'
        >>> msa.get_sequence(4)
        'CGTTTCAGCATA-GAT'
        >>> msa.get_kept_nucleotides(0)
        'AC'
        >>> msa.get_kept_nucleotides(1)
        'G'
        >>> msa.get_kept_nucleotides(12)
        '-C-'
        >>> msa.get_column(0).nt_pos() == {'A': [(0,1)], 'C': [(2, 4)]}
        True
        >>> msa.get_column(1).nt_pos() == {'G': [(0,4)]}
        True
        >>> msa.get_column(12).nt_pos() == {'-': [(0,0), (4, 4)], 'C': [(1,3)]}
        True
        >>> msa.get_column(12).nt_frequency() == {'-': .4, 'C': .6}
        True
        >>> msa.get_column(9).get_nb_changes()
        5
        >>> len(msa.get_column(3))
        5
        >>> msa.get_column(5) == msa[5]
        True
        >>> msa.get_column(1).get_vector_type()
        <class 'pysdsl.pysdsl.SDVector'>
        >>> msa.get_column(0).get_vector_type()
        <class 'pysdsl.pysdsl.RamanRamanRaoVector63'>
    '''

    def __init__(self, fasta_file, nb_columns=1000, vector=pysdsl.SDVector, compressed=False, other_vector_type='pysdsl.RamanRamanRaoVector63', ratio_other_vector=.1):
        """
        Build the succinct multiple alignment as a list of SuccinctColumn.

        Parameters:
            fasta_file (str): A FASTA file containing multiple sequences aligned.
            vector (str): Selection of the class representing the bit vector.
        """
        self.__multialign = []
        self.__other_vector = other_vector_type
        self.__ratio_other_vector = ratio_other_vector
        if fasta_file:
            self.__project_name = os.path.basename(fasta_file).split('.')[0]
            self.__size, self.__length = self.__fetch_alignment_size(fasta_file, compressed)
            traverse_columns = range(0, self.__length, nb_columns)
            self.__progress = tqdm(total = len(traverse_columns) * self.__size)
            for position in traverse_columns:
                self.__multialign += self.__fetch_column(fasta_file, position, nb_columns, compressed)
            self.__progress.close()

    def __len__(self):
        return len(self.__multialign)

    def __getitem__(self, index):
        return self.get_column(index)

    @staticmethod
    def load(input_file):
        '''
        Load a previously saved multiple alignment

        Parameters:
            input_file (str): The filename to load

        Returns:
            (SuccinctMultipleAlignment): The loaded SuccinctMultipleAlignment
        '''
        msa = SuccinctMultipleAlignment(None)
        msa.__project_name = os.path.basename(input_file).split('.')[0]
        msa.__multialign, msa.__size, msa.__length, msa.__other_vector = msa.load_from_file(input_file)
        return msa

    @staticmethod
    def __fetch_alignment_size(fasta_file, compressed = False):
        """
        Read the FASTA file and store the size and the number of sequences.

        Parameters:
            fasta_file (str): A FASTA file containing multiple sequences aligned.
            compressed (bool, optional): Whether the input is compressed or not. Defaults to False.

        Returns:
            (tuple of int):
                - The number of sequences.
                - The length of the sequences (which is supposed to be the same for every sequence).
        """
        if not os.path.isfile(fasta_file):
            raise FileNotFoundError(fasta_file)

        with openfile(fasta_file, "r", compressed) as handle:
            seq_count = 0  # sequence counter
            align_length = None
            for record in SeqIO.parse(handle, 'fasta'):
                if align_length is None:
                    align_length = len(record.seq)
                    seq_count += 1
                else:
                    if align_length != len(record.seq):
                        raise ValueError
                    seq_count += 1
        return (seq_count, align_length)

    def __fetch_column(self, fasta_file, position, nb_column, compressed=False):
        """
        Read the FASTA file and store 'nb_column' columns as SuccinctColumn objects in a list.
        To do that, it reads 'nb_column' nucleotides in each sequence, starting at the position 'position' in the file 'fasta_file'.

        Parameters:
            fasta_file (st): A FASTA file containing multiple sequences aligned.
            position (int): The position in the sequence where the search starts.
            nb_column (int): The number of columns to build in a single run.
            compressed (bool, optional): Whether the input is compressed or not. Defaults to False.

        Returns:
            (list of SuccinctColumn):
                A list of SuccinctColumns objects of size 'nb_column' corresponding to columns of the multialignment 
                starting at the position 'position'.
        """            

        seq_count = 0
        nt_kept, previous_nt = [''] * nb_column, [''] * nb_column
        bit_vectors = []
        with openfile(fasta_file, "r", compressed) as handle:
            records = SeqIO.parse(handle, 'fasta')
            record = next(records)
            i = 0
            while i < nb_column and position + i < self.__length:
                bit_vectors.append(pysdsl.BitVector(self.__size))
                bit_vectors[i][seq_count] = 1
                nt_kept[i] += record.seq[position + i].upper()
                previous_nt[i] = record.seq[position + i]
                i += 1
            seq_count += 1


            for record in records:
                i = 0
                # Parallelization of the loop below was attempted, unsucessfully: it was slower.

                pos_diff = [i for i in range(min(self.__length-position, nb_column)) if previous_nt[i] != record.seq[position + i]]
                for pos in pos_diff:
                    bit_vectors[pos][seq_count] = 1
                    nt_kept[pos] += record.seq[position+pos].upper()
                    previous_nt[pos] = record.seq[position+pos]

                seq_count += 1
                self.__progress.update(1)
        sd_vector = []

        for i in range(len(bit_vectors)):
            sd_vector.append(SuccinctColumn(bitvector=bit_vectors[i], nt_kept=nt_kept[i], vector=vector_type(len(nt_kept[i]), len(bit_vectors[i]), self.__ratio_other_vector, self.__other_vector)))
        del bit_vectors, nt_kept, previous_nt
        return sd_vector

    def size_in_bytes(self):
        """
        Return the size in bytes of the entire succinct multiple alignment (sum of the size in bytes of all the SuccinctColumn objects).

        Returns:
            (int):
                The size in bytes of the entire succinct multiple alignment.
        """
        return sum([succinct_column.size_in_bytes() for succinct_column in self.__multialign])

    def get_column(self, index):
        '''
        Returns the column at index `index`.

        Parameters:
            index (int): Index of the column (starts at 0).

        Returns:
            (SuccinctColumn):
                The column at the requested index.
        '''
        assert 0<= index < len(self.__multialign), "Index {} doesn't exist in the alignment".format(index)

        return self.__multialign[index]

    def get_consensus(self, ratio_min=.5):
        '''
        Computes and returns a consensus sequence.

        Parameters:
            ratio_min (int): The minimal ratio of the nucleotide in majority to add it to the consensus (N otherwise).

        Complexity:
            $\Omega(n)$, $O(n\\times s)$, with $s$ the number of sequences and $n$ the number of columns.
            On average, $\Theta(n\\times b)$, with $b$ the average number of changes in nucleotides in a given column.

        Returns:
            (str):
                A consensus sequence.
        '''
        sequence = ['N'] * len(self)

        for i in range(len(self)):
            freq = self.get_column(i).nt_frequency()
            max_nt = max(freq, key=freq.get)
            if freq[max_nt] >= ratio_min:
                sequence[i] = max_nt

        return ''.join(sequence)

    def get_nb_sequences(self):
        '''
        Returns:
            (int): the number of sequences in the multiple alignment
        '''

        return self.__size

    def get_nt(self, seq_index, position):
        """
        Return the nucleotide in the position specified in the sequence of index "seq_index".

        Parameters:
            seq_index (int): The index of the sequence to search in.
            position (int): The position to look at in the sequence.

        Returns:
            (str): The nucleotide in the position specified in the sequence of index "seq_index".
        """
        return self.__multialign[position].get_nt(seq_index)

    def get_sequence(self, seq_index):
        """
        Return the sequence of index "seq_index".

        Parameters:
            seq_index (int): The index of the sequence to search in.

        Returns:
            (str): The sequence of index "seq_index".
        """
        return "".join([self.get_nt(seq_index, position) for position in range(self.__length)])

    def get_vector(self, index):
        """
        Returns the SDVector object of a SuccinctColumn object corresponding to the 'index'-th column in the multiple alignment.

        Parameters:
            index (int): The column considered.

        Returns:
            (pysdsl.SDVector): The SDVector object corresponding to the compacted representation of the bit vector of the column specified.
        """
        return self.__multialign[index].get_vector()

    def get_kept_nucleotides(self, index):
        """
        Returns the nucleotides - used to deduce the column's sequence from the bit vector - of the 'index'-th column.

        Parameters:
            index (int): The column considered.

        Returns:
            (str): The nucleotides kept in the SuccinctColumn object corresponding to the 'index'-th column in the multiple alignment.
        """
        return self.__multialign[index].get_kept_nucleotides()

    def get_info(self):
        """
        Return general informations such as the alignment length (length of the sequences) and the alignment size (number of sequences).

        Returns:
            (int): The alignment length (length of the sequences).
            (int): The alignment size (number of sequences).
        """
        return self.__length, self.__size

    def size_to_csv(self, file_name="size.csv", sort_by_size=True):
        """
        Save the size in bytes of each SuccinctColumn object in a CSV file.

        Parameters:
            file_name (str, optional): The name of the CSV file to save the sizes. Default is "size.csv".
            sort_by_size (bool, optional): If True, the sizes will be sorted in ascending order. Default is True.
        """
        ### la liste de tailles des colonnes
        sizes = [(i, self.__multialign[i].size_in_bytes(), self.__multialign[i].get_vector().size_in_bytes,
                  len(self.__multialign[i].get_kept_nucleotides())) for i in range(self.__length)]
        ### triee les colonnes par ordre croissant  de taille
        if sort_by_size:
            sizes.sort(key=lambda x: x[1])
        # ecriture dans le fichier CSV
        with open(file_name, "w") as fileOut:
            cumulative_sizes = []
            writer = csv.writer(fileOut)
            # ecriture d'en-tetes du csv
            writer.writerow(["Index", "column sorted by size", "cumulative column sizes", "vector size", "nt size"])
            cumulative_size = 0
            cumulative_sizes.append(cumulative_size)
            for i, size, vsize, ntsize in sizes:
                ###### pour la partie qui cumule  les tailles des colonnes
                cumulative_size += size
                cumulative_sizes.append(cumulative_size)
                writer.writerow([i, size, cumulative_size, vsize, ntsize])

        # Plot
        # plt.plot(range(len(cumulative_sizes)), cumulative_sizes)
        # plt.xlabel('Sorted Column Index')
        # plt.ylabel('Cumulative Size')
        # plt.title('Cumulative Column Sizes')
        # plt.grid(True)
        # plt.show()


    def column_size_in_bytes(self, index):
        """
        Return the size in bytes of the SuccinctColumn objects at the index.

        Parameters:
            index (int): The column considered.

        Returns:
            (int): The size in bytes of the selected SuccinctColumn objects.
        """
        return self.__multialign[index].size_in_bytes()

    def store_to_file(self, output_file):
        """
        Store all the Succinct_column in the SuccinctMultipleAlignment, in a compressed directory

        Parameters:
            output_file (str): The path or the directory where the save will be created.
        """
        tmpdir = tempfile.mkdtemp()
        final_dir = tmpdir
        vector_types = pysdsl.BitVector(len(self))
        with open(final_dir + '/info.txt', 'w') as fileOut:
            fileOut.write('{},{},{}'.format(self.__size, self.__length, self.__other_vector))
        nt_file = open(final_dir + '/nucleotides.txt', 'w')
        for i, succinct_column in enumerate(self.__multialign):
            succinct_column.store_to_file(final_dir+"/{}_column".format(i), nt_file)
            if succinct_column.get_vector_type() != pysdsl.SDVector:
                vector_types[i] = 1
        vector_types = pysdsl.SDVector(vector_types)
        vector_types.store_to_file(final_dir+"/types.sd")
        nt_file.close()

        subprocess.call(['tar', '-zcf','{}.tar.gz'.format(tmpdir), '.'],
                        cwd=tmpdir)
        subprocess.call(['mv', '{}.tar.gz'.format(tmpdir), output_file])
        shutil.rmtree(tmpdir)

    def load_from_file(self, filename):
        """
        Create a SuccinctMultipleAlignment from the files produced by the store_to_file() function.

        Parameters:
            filename (str): The filename from which to recreate the saved SuccinctMultipleAlignment.

        Returns:
            (list): All the Succinct_columns
            (int): The number of sequences.
            (int): The length of the sequences (which is supposed to be the same for every sequence).
        """
        list_succinct_columns = []
        tmpdir = tempfile.mkdtemp()
        name = os.path.basename(filename).split('.')[0]
        direct = tmpdir
        subprocess.call(['tar', '-zxf', '{}'.format(filename), '-C', '{}'.format(tmpdir)])
        with open(direct + '/info.txt') as fileIn:
            info = fileIn.readline().split(',')
            size = int(info[0])
            length = int(info[1])
            vtype = locate(info[2])
        nt_file = open(direct + '/nucleotides.txt')
        vector_types = pysdsl.SDVector.load_from_file(direct+"/types.sd")
        for i in range(length):
            current_type = pysdsl.SDVector if not vector_types[i] else vtype
            list_succinct_columns.append(SuccinctColumn.load(direct+"/{}_column".format(i), nt_file, current_type))
        shutil.rmtree(tmpdir)
        return list_succinct_columns, size, length, vtype

    def find_columns_with_excessive_space(self, threshold_ratio=2):
        """
        Identifies columns that occupy significantly more space than the average column size.

        Parameters:
            threshold_ratio (float, optional): The threshold ratio used to determine whether a column occupies significantly more space than the average column size.
                Default value is 2, meaning a column is considered to occupy significantly more space if its size is at least twice the average size.

        Returns:
            (list[int]): A list of indices of columns that occupy significantly more space than the average column size.
        """
        average_size = sum(succinct_column.size_in_bytes() for succinct_column in self.__multialign) / len(self.__multialign)
        excessive_columns = []
        size_excessive=0
        size=0
        for index, succinct_column in enumerate(self.__multialign) :
            size+=succinct_column.size_in_bytes()
            if succinct_column.size_in_bytes() >= threshold_ratio * average_size:
                excessive_columns.append(index)
                size_excessive+=(succinct_column.size_in_bytes())
        print('moyenne:',average_size)
        print('total size :' ,size)
        print('excessive_columns size :', size_excessive)

        return len(excessive_columns), len(self.__multialign)

__fetch_alignment_size(fasta_file, compressed=False) staticmethod

Read the FASTA file and store the size and the number of sequences.

Parameters:

Name Type Description Default
fasta_file str

A FASTA file containing multiple sequences aligned.

required
compressed bool

Whether the input is compressed or not. Defaults to False.

False

Returns:

Type Description
tuple of int
  • The number of sequences.
  • The length of the sequences (which is supposed to be the same for every sequence).
Source code in succinct_multiple_alignment.py
@staticmethod
def __fetch_alignment_size(fasta_file, compressed = False):
    """
    Read the FASTA file and store the size and the number of sequences.

    Parameters:
        fasta_file (str): A FASTA file containing multiple sequences aligned.
        compressed (bool, optional): Whether the input is compressed or not. Defaults to False.

    Returns:
        (tuple of int):
            - The number of sequences.
            - The length of the sequences (which is supposed to be the same for every sequence).
    """
    if not os.path.isfile(fasta_file):
        raise FileNotFoundError(fasta_file)

    with openfile(fasta_file, "r", compressed) as handle:
        seq_count = 0  # sequence counter
        align_length = None
        for record in SeqIO.parse(handle, 'fasta'):
            if align_length is None:
                align_length = len(record.seq)
                seq_count += 1
            else:
                if align_length != len(record.seq):
                    raise ValueError
                seq_count += 1
    return (seq_count, align_length)

__fetch_column(fasta_file, position, nb_column, compressed=False)

Read the FASTA file and store 'nb_column' columns as SuccinctColumn objects in a list. To do that, it reads 'nb_column' nucleotides in each sequence, starting at the position 'position' in the file 'fasta_file'.

Parameters:

Name Type Description Default
fasta_file st

A FASTA file containing multiple sequences aligned.

required
position int

The position in the sequence where the search starts.

required
nb_column int

The number of columns to build in a single run.

required
compressed bool

Whether the input is compressed or not. Defaults to False.

False

Returns:

Type Description
list of SuccinctColumn

A list of SuccinctColumns objects of size 'nb_column' corresponding to columns of the multialignment starting at the position 'position'.

Source code in succinct_multiple_alignment.py
def __fetch_column(self, fasta_file, position, nb_column, compressed=False):
    """
    Read the FASTA file and store 'nb_column' columns as SuccinctColumn objects in a list.
    To do that, it reads 'nb_column' nucleotides in each sequence, starting at the position 'position' in the file 'fasta_file'.

    Parameters:
        fasta_file (st): A FASTA file containing multiple sequences aligned.
        position (int): The position in the sequence where the search starts.
        nb_column (int): The number of columns to build in a single run.
        compressed (bool, optional): Whether the input is compressed or not. Defaults to False.

    Returns:
        (list of SuccinctColumn):
            A list of SuccinctColumns objects of size 'nb_column' corresponding to columns of the multialignment 
            starting at the position 'position'.
    """            

    seq_count = 0
    nt_kept, previous_nt = [''] * nb_column, [''] * nb_column
    bit_vectors = []
    with openfile(fasta_file, "r", compressed) as handle:
        records = SeqIO.parse(handle, 'fasta')
        record = next(records)
        i = 0
        while i < nb_column and position + i < self.__length:
            bit_vectors.append(pysdsl.BitVector(self.__size))
            bit_vectors[i][seq_count] = 1
            nt_kept[i] += record.seq[position + i].upper()
            previous_nt[i] = record.seq[position + i]
            i += 1
        seq_count += 1


        for record in records:
            i = 0
            # Parallelization of the loop below was attempted, unsucessfully: it was slower.

            pos_diff = [i for i in range(min(self.__length-position, nb_column)) if previous_nt[i] != record.seq[position + i]]
            for pos in pos_diff:
                bit_vectors[pos][seq_count] = 1
                nt_kept[pos] += record.seq[position+pos].upper()
                previous_nt[pos] = record.seq[position+pos]

            seq_count += 1
            self.__progress.update(1)
    sd_vector = []

    for i in range(len(bit_vectors)):
        sd_vector.append(SuccinctColumn(bitvector=bit_vectors[i], nt_kept=nt_kept[i], vector=vector_type(len(nt_kept[i]), len(bit_vectors[i]), self.__ratio_other_vector, self.__other_vector)))
    del bit_vectors, nt_kept, previous_nt
    return sd_vector

__init__(fasta_file, nb_columns=1000, vector=pysdsl.SDVector, compressed=False, other_vector_type='pysdsl.RamanRamanRaoVector63', ratio_other_vector=0.1)

Build the succinct multiple alignment as a list of SuccinctColumn.

Parameters:

Name Type Description Default
fasta_file str

A FASTA file containing multiple sequences aligned.

required
vector str

Selection of the class representing the bit vector.

SDVector
Source code in succinct_multiple_alignment.py
def __init__(self, fasta_file, nb_columns=1000, vector=pysdsl.SDVector, compressed=False, other_vector_type='pysdsl.RamanRamanRaoVector63', ratio_other_vector=.1):
    """
    Build the succinct multiple alignment as a list of SuccinctColumn.

    Parameters:
        fasta_file (str): A FASTA file containing multiple sequences aligned.
        vector (str): Selection of the class representing the bit vector.
    """
    self.__multialign = []
    self.__other_vector = other_vector_type
    self.__ratio_other_vector = ratio_other_vector
    if fasta_file:
        self.__project_name = os.path.basename(fasta_file).split('.')[0]
        self.__size, self.__length = self.__fetch_alignment_size(fasta_file, compressed)
        traverse_columns = range(0, self.__length, nb_columns)
        self.__progress = tqdm(total = len(traverse_columns) * self.__size)
        for position in traverse_columns:
            self.__multialign += self.__fetch_column(fasta_file, position, nb_columns, compressed)
        self.__progress.close()

column_size_in_bytes(index)

Return the size in bytes of the SuccinctColumn objects at the index.

Parameters:

Name Type Description Default
index int

The column considered.

required

Returns:

Type Description
int

The size in bytes of the selected SuccinctColumn objects.

Source code in succinct_multiple_alignment.py
def column_size_in_bytes(self, index):
    """
    Return the size in bytes of the SuccinctColumn objects at the index.

    Parameters:
        index (int): The column considered.

    Returns:
        (int): The size in bytes of the selected SuccinctColumn objects.
    """
    return self.__multialign[index].size_in_bytes()

find_columns_with_excessive_space(threshold_ratio=2)

Identifies columns that occupy significantly more space than the average column size.

Parameters:

Name Type Description Default
threshold_ratio float

The threshold ratio used to determine whether a column occupies significantly more space than the average column size. Default value is 2, meaning a column is considered to occupy significantly more space if its size is at least twice the average size.

2

Returns:

Type Description
list[int]

A list of indices of columns that occupy significantly more space than the average column size.

Source code in succinct_multiple_alignment.py
def find_columns_with_excessive_space(self, threshold_ratio=2):
    """
    Identifies columns that occupy significantly more space than the average column size.

    Parameters:
        threshold_ratio (float, optional): The threshold ratio used to determine whether a column occupies significantly more space than the average column size.
            Default value is 2, meaning a column is considered to occupy significantly more space if its size is at least twice the average size.

    Returns:
        (list[int]): A list of indices of columns that occupy significantly more space than the average column size.
    """
    average_size = sum(succinct_column.size_in_bytes() for succinct_column in self.__multialign) / len(self.__multialign)
    excessive_columns = []
    size_excessive=0
    size=0
    for index, succinct_column in enumerate(self.__multialign) :
        size+=succinct_column.size_in_bytes()
        if succinct_column.size_in_bytes() >= threshold_ratio * average_size:
            excessive_columns.append(index)
            size_excessive+=(succinct_column.size_in_bytes())
    print('moyenne:',average_size)
    print('total size :' ,size)
    print('excessive_columns size :', size_excessive)

    return len(excessive_columns), len(self.__multialign)

get_column(index)

Returns the column at index index.

Parameters:

Name Type Description Default
index int

Index of the column (starts at 0).

required

Returns:

Type Description
SuccinctColumn

The column at the requested index.

Source code in succinct_multiple_alignment.py
def get_column(self, index):
    '''
    Returns the column at index `index`.

    Parameters:
        index (int): Index of the column (starts at 0).

    Returns:
        (SuccinctColumn):
            The column at the requested index.
    '''
    assert 0<= index < len(self.__multialign), "Index {} doesn't exist in the alignment".format(index)

    return self.__multialign[index]

get_consensus(ratio_min=0.5)

Computes and returns a consensus sequence.

Parameters:

Name Type Description Default
ratio_min int

The minimal ratio of the nucleotide in majority to add it to the consensus (N otherwise).

0.5
Complexity

\(\Omega(n)\), \(O(n\times s)\), with \(s\) the number of sequences and \(n\) the number of columns. On average, \(\Theta(n\times b)\), with \(b\) the average number of changes in nucleotides in a given column.

Returns:

Type Description
str

A consensus sequence.

Source code in succinct_multiple_alignment.py
def get_consensus(self, ratio_min=.5):
    '''
    Computes and returns a consensus sequence.

    Parameters:
        ratio_min (int): The minimal ratio of the nucleotide in majority to add it to the consensus (N otherwise).

    Complexity:
        $\Omega(n)$, $O(n\\times s)$, with $s$ the number of sequences and $n$ the number of columns.
        On average, $\Theta(n\\times b)$, with $b$ the average number of changes in nucleotides in a given column.

    Returns:
        (str):
            A consensus sequence.
    '''
    sequence = ['N'] * len(self)

    for i in range(len(self)):
        freq = self.get_column(i).nt_frequency()
        max_nt = max(freq, key=freq.get)
        if freq[max_nt] >= ratio_min:
            sequence[i] = max_nt

    return ''.join(sequence)

get_info()

Return general informations such as the alignment length (length of the sequences) and the alignment size (number of sequences).

Returns:

Type Description
int

The alignment length (length of the sequences).

int

The alignment size (number of sequences).

Source code in succinct_multiple_alignment.py
def get_info(self):
    """
    Return general informations such as the alignment length (length of the sequences) and the alignment size (number of sequences).

    Returns:
        (int): The alignment length (length of the sequences).
        (int): The alignment size (number of sequences).
    """
    return self.__length, self.__size

get_kept_nucleotides(index)

Returns the nucleotides - used to deduce the column's sequence from the bit vector - of the 'index'-th column.

Parameters:

Name Type Description Default
index int

The column considered.

required

Returns:

Type Description
str

The nucleotides kept in the SuccinctColumn object corresponding to the 'index'-th column in the multiple alignment.

Source code in succinct_multiple_alignment.py
def get_kept_nucleotides(self, index):
    """
    Returns the nucleotides - used to deduce the column's sequence from the bit vector - of the 'index'-th column.

    Parameters:
        index (int): The column considered.

    Returns:
        (str): The nucleotides kept in the SuccinctColumn object corresponding to the 'index'-th column in the multiple alignment.
    """
    return self.__multialign[index].get_kept_nucleotides()

get_nb_sequences()

Returns:

Type Description
int

the number of sequences in the multiple alignment

Source code in succinct_multiple_alignment.py
def get_nb_sequences(self):
    '''
    Returns:
        (int): the number of sequences in the multiple alignment
    '''

    return self.__size

get_nt(seq_index, position)

Return the nucleotide in the position specified in the sequence of index "seq_index".

Parameters:

Name Type Description Default
seq_index int

The index of the sequence to search in.

required
position int

The position to look at in the sequence.

required

Returns:

Type Description
str

The nucleotide in the position specified in the sequence of index "seq_index".

Source code in succinct_multiple_alignment.py
def get_nt(self, seq_index, position):
    """
    Return the nucleotide in the position specified in the sequence of index "seq_index".

    Parameters:
        seq_index (int): The index of the sequence to search in.
        position (int): The position to look at in the sequence.

    Returns:
        (str): The nucleotide in the position specified in the sequence of index "seq_index".
    """
    return self.__multialign[position].get_nt(seq_index)

get_sequence(seq_index)

Return the sequence of index "seq_index".

Parameters:

Name Type Description Default
seq_index int

The index of the sequence to search in.

required

Returns:

Type Description
str

The sequence of index "seq_index".

Source code in succinct_multiple_alignment.py
def get_sequence(self, seq_index):
    """
    Return the sequence of index "seq_index".

    Parameters:
        seq_index (int): The index of the sequence to search in.

    Returns:
        (str): The sequence of index "seq_index".
    """
    return "".join([self.get_nt(seq_index, position) for position in range(self.__length)])

get_vector(index)

Returns the SDVector object of a SuccinctColumn object corresponding to the 'index'-th column in the multiple alignment.

Parameters:

Name Type Description Default
index int

The column considered.

required

Returns:

Type Description
SDVector

The SDVector object corresponding to the compacted representation of the bit vector of the column specified.

Source code in succinct_multiple_alignment.py
def get_vector(self, index):
    """
    Returns the SDVector object of a SuccinctColumn object corresponding to the 'index'-th column in the multiple alignment.

    Parameters:
        index (int): The column considered.

    Returns:
        (pysdsl.SDVector): The SDVector object corresponding to the compacted representation of the bit vector of the column specified.
    """
    return self.__multialign[index].get_vector()

load(input_file) staticmethod

Load a previously saved multiple alignment

Parameters:

Name Type Description Default
input_file str

The filename to load

required

Returns:

Type Description
SuccinctMultipleAlignment

The loaded SuccinctMultipleAlignment

Source code in succinct_multiple_alignment.py
@staticmethod
def load(input_file):
    '''
    Load a previously saved multiple alignment

    Parameters:
        input_file (str): The filename to load

    Returns:
        (SuccinctMultipleAlignment): The loaded SuccinctMultipleAlignment
    '''
    msa = SuccinctMultipleAlignment(None)
    msa.__project_name = os.path.basename(input_file).split('.')[0]
    msa.__multialign, msa.__size, msa.__length, msa.__other_vector = msa.load_from_file(input_file)
    return msa

load_from_file(filename)

Create a SuccinctMultipleAlignment from the files produced by the store_to_file() function.

Parameters:

Name Type Description Default
filename str

The filename from which to recreate the saved SuccinctMultipleAlignment.

required

Returns:

Type Description
list

All the Succinct_columns

int

The number of sequences.

int

The length of the sequences (which is supposed to be the same for every sequence).

Source code in succinct_multiple_alignment.py
def load_from_file(self, filename):
    """
    Create a SuccinctMultipleAlignment from the files produced by the store_to_file() function.

    Parameters:
        filename (str): The filename from which to recreate the saved SuccinctMultipleAlignment.

    Returns:
        (list): All the Succinct_columns
        (int): The number of sequences.
        (int): The length of the sequences (which is supposed to be the same for every sequence).
    """
    list_succinct_columns = []
    tmpdir = tempfile.mkdtemp()
    name = os.path.basename(filename).split('.')[0]
    direct = tmpdir
    subprocess.call(['tar', '-zxf', '{}'.format(filename), '-C', '{}'.format(tmpdir)])
    with open(direct + '/info.txt') as fileIn:
        info = fileIn.readline().split(',')
        size = int(info[0])
        length = int(info[1])
        vtype = locate(info[2])
    nt_file = open(direct + '/nucleotides.txt')
    vector_types = pysdsl.SDVector.load_from_file(direct+"/types.sd")
    for i in range(length):
        current_type = pysdsl.SDVector if not vector_types[i] else vtype
        list_succinct_columns.append(SuccinctColumn.load(direct+"/{}_column".format(i), nt_file, current_type))
    shutil.rmtree(tmpdir)
    return list_succinct_columns, size, length, vtype

size_in_bytes()

Return the size in bytes of the entire succinct multiple alignment (sum of the size in bytes of all the SuccinctColumn objects).

Returns:

Type Description
int

The size in bytes of the entire succinct multiple alignment.

Source code in succinct_multiple_alignment.py
def size_in_bytes(self):
    """
    Return the size in bytes of the entire succinct multiple alignment (sum of the size in bytes of all the SuccinctColumn objects).

    Returns:
        (int):
            The size in bytes of the entire succinct multiple alignment.
    """
    return sum([succinct_column.size_in_bytes() for succinct_column in self.__multialign])

size_to_csv(file_name='size.csv', sort_by_size=True)

Save the size in bytes of each SuccinctColumn object in a CSV file.

Parameters:

Name Type Description Default
file_name str

The name of the CSV file to save the sizes. Default is "size.csv".

'size.csv'
sort_by_size bool

If True, the sizes will be sorted in ascending order. Default is True.

True
Source code in succinct_multiple_alignment.py
def size_to_csv(self, file_name="size.csv", sort_by_size=True):
    """
    Save the size in bytes of each SuccinctColumn object in a CSV file.

    Parameters:
        file_name (str, optional): The name of the CSV file to save the sizes. Default is "size.csv".
        sort_by_size (bool, optional): If True, the sizes will be sorted in ascending order. Default is True.
    """
    ### la liste de tailles des colonnes
    sizes = [(i, self.__multialign[i].size_in_bytes(), self.__multialign[i].get_vector().size_in_bytes,
              len(self.__multialign[i].get_kept_nucleotides())) for i in range(self.__length)]
    ### triee les colonnes par ordre croissant  de taille
    if sort_by_size:
        sizes.sort(key=lambda x: x[1])
    # ecriture dans le fichier CSV
    with open(file_name, "w") as fileOut:
        cumulative_sizes = []
        writer = csv.writer(fileOut)
        # ecriture d'en-tetes du csv
        writer.writerow(["Index", "column sorted by size", "cumulative column sizes", "vector size", "nt size"])
        cumulative_size = 0
        cumulative_sizes.append(cumulative_size)
        for i, size, vsize, ntsize in sizes:
            ###### pour la partie qui cumule  les tailles des colonnes
            cumulative_size += size
            cumulative_sizes.append(cumulative_size)
            writer.writerow([i, size, cumulative_size, vsize, ntsize])

store_to_file(output_file)

Store all the Succinct_column in the SuccinctMultipleAlignment, in a compressed directory

Parameters:

Name Type Description Default
output_file str

The path or the directory where the save will be created.

required
Source code in succinct_multiple_alignment.py
def store_to_file(self, output_file):
    """
    Store all the Succinct_column in the SuccinctMultipleAlignment, in a compressed directory

    Parameters:
        output_file (str): The path or the directory where the save will be created.
    """
    tmpdir = tempfile.mkdtemp()
    final_dir = tmpdir
    vector_types = pysdsl.BitVector(len(self))
    with open(final_dir + '/info.txt', 'w') as fileOut:
        fileOut.write('{},{},{}'.format(self.__size, self.__length, self.__other_vector))
    nt_file = open(final_dir + '/nucleotides.txt', 'w')
    for i, succinct_column in enumerate(self.__multialign):
        succinct_column.store_to_file(final_dir+"/{}_column".format(i), nt_file)
        if succinct_column.get_vector_type() != pysdsl.SDVector:
            vector_types[i] = 1
    vector_types = pysdsl.SDVector(vector_types)
    vector_types.store_to_file(final_dir+"/types.sd")
    nt_file.close()

    subprocess.call(['tar', '-zcf','{}.tar.gz'.format(tmpdir), '.'],
                    cwd=tmpdir)
    subprocess.call(['mv', '{}.tar.gz'.format(tmpdir), output_file])
    shutil.rmtree(tmpdir)

vector_type(nb_changes, total_length, ratio, bv_type)

Returns the vector type to use depending on the number of changes in the bit vector (ie. the number of 1) and the total length

Parameters:

Name Type Description Default
nb_changes int

The number of 1s in the bit vector

required
total_length int

The length of the bit vector

required

Returns:

Type Description
type

A type corresponding to a bit vector type (e.g., pysdsl.SDVector).

Source code in succinct_multiple_alignment.py
def vector_type(nb_changes, total_length, ratio, bv_type):
    '''
    Returns the vector type to use depending on the number of changes in the bit vector (ie. the number of 1)
    and the total length

    Parameters:
        nb_changes (int): The number of 1s in the bit vector
        total_length (int):  The length of the bit vector

    Returns:
         (type): A type corresponding to a bit vector type (e.g., pysdsl.SDVector).
    '''
    if nb_changes < ratio*total_length:
        return pysdsl.SDVector
    else:
        return locate(bv_type)

Succint Column class

SuccinctColumn

Source code in succinct_column.py
class SuccinctColumn:

    def __init__(self, bitvector=None, nt_kept=None, vector=None):
        """
        Build a SDVector or a BitVector and a sequence of nucleotides (corresponding to the "1" in the bit sequence) from all the 
        nucleotides in a column.

        Parameters:
            bitvector (pysdsl.BitVector): A bit vector corresponding to a simplified version of multiple alignment.
            nt_kept (str): Nucleotides corresponding to the '1' in the bit vector.
            vector (type): Selection of the class representing the bit vector.
        """
        self.__type = vector
        self.__nucleotides = nt_kept
        if bitvector:
            if not vector:
                self.__vector = bitvector
            else:
                self.__vector = vector(bitvector)
            self.__init_rank()

    def __init_rank(self):
        '''
        Init rank/select data structures
        '''
        self.__rank = self.__vector.init_rank_1()
        self.__select = self.__vector.init_select_1()


    @staticmethod
    def load(path, nt_file, vector_type):
        '''
        Parameters:
            path (str): Path to the file storing the bitvector of the SuccinctColumn.
            nt_file (str or Reader): The file in which nucleotides will be written.
            vector_type (type): The type of the vector.

        Returns:
            (SuccinctColumn): A column loaded from the file
        '''

        column = SuccinctColumn()
        column.__type = vector_type
        column.load_from_file(path, nt_file, vector_type)
        return column

    def __len__(self):
        return len(self.__vector)

    def size_in_bytes(self):
        """
        Return the size in bytes of the pysdsl vector representing the column of nucleotides.

        Returns:
            (int): The size in bytes of the pysdsl vector representing the column of nucleotides

        """
        return self.__vector.size_in_bytes + len(self.__nucleotides)

    def nt_counts(self):
        """
        Returns the counts of each nucleotide in the column.

        Returns:
            (dict): the counts of each nucleotide/symbol existing in the column
        """
        nt_pos_dict = self.nt_pos()

        return { nt : sum([x[1]-x[0]+1 for x in poslist]) for nt, poslist in nt_pos_dict.items() }

    def nt_frequency(self):
        """
        Returns the percentage of each nucleotide in the column.

        Returns:
            (dict): the percentage of each nucleotide/symbol existing in the column
        """
        nt_count_dict = self.nt_counts()
        return {nt : count*1. / len(self.__vector) for nt, count in nt_count_dict.items()}

    def nt_pos(self):
        '''
        Get the start and end positions of each run of nucleotides

        Returns:
            (dict): keys are existing nucleotides and values are a list of tuples with start and end positions (inclusive)
        '''
        nt_pos_dict = defaultdict(list)
        nb_ones = 1
        previous_pos = 0
        pos=0

        while nb_ones < self.get_nb_changes():
            if nb_ones+1 < len(self):
                pos = self.__select(nb_ones+1)
            else:
                # Bug with select: when all the bits are at 1 we can't query the last
                pos = len(self)-1
            nt_pos_dict[self.__nucleotides[nb_ones-1]].append((previous_pos, pos-1))
            nb_ones += 1
            previous_pos = pos
        length_vector = len(self.__vector)

        nt_pos_dict[self.__nucleotides[-1]].append((pos, length_vector-1))
        return nt_pos_dict


    def get_nt(self, position):
        """ 
        Returns the nucleotide at the position specified in the column (the p-th sequence in the alignment).

        Parameters:
            position (int): The position of the nucleotide in the column.

        Returns:
            (str): The target nucleotide.
        """
        length_vector = len(self.__vector)
        if position == length_vector - 1:
            nt = self.__nucleotides[-1]
        else:
            nt = self.__nucleotides[self.__rank(position + 1) - 1]
        return nt

    def get_nb_changes(self):
        '''
        Returns the number of changes in the vector

        Returns:
            (int): The number of positions with a 1 in the bit vector
        '''
        return len(self.get_kept_nucleotides())

    def get_pos_of_ones(self):
        '''
        Return the positions of the ones in the bit vector

        Returns:
            (set): The set of the positions where the value in the bit vector is  1
        '''
        pos_ones = set()
        pos_ones.add(0)
        nb_ones = 1
        pos=0

        while nb_ones < self.get_nb_changes():
            if nb_ones+1 < len(self):
                pos = self.__select(nb_ones+1)
            else:
                # Bug with select: when all the bits are at 1 we can't query the last
                pos = len(self)-1
            pos_ones.add(pos)
            nb_ones += 1

        return pos_ones

    def get_vector(self):
        """
        Returns the SDVector object corresponding to the compacted representation of the bit vector.

        Returns:
            (vector):
                The object (whose type corresponds to self.get_vector_type()) corresponding to the compacted representation of the bit vector.
        """
        return self.__vector

    def get_vector_type(self):
        '''
        Returns the type of the vector returned by self.get_vector().

        Returns:
            (type): The type of the bit vector
        '''
        return self.__type

    def get_kept_nucleotides(self):
        """
        Returns the nucleotides used to deduce the column's sequence from the bit vector.

        Returns:
            (str): The nucleotides kept.
        """
        return self.__nucleotides

    def store_to_file(self, bv_file, nt_file):
        """
        Store the SDVector and the nucleotides in two files.
        Do not use if the bit vector is represented by a pysdsl.BitVector.

        Parameters:
            bv_file (str): The path to the file that will store the bitvector.
            nt_file (str or Writer): The file in which nucleotides will be written.
        """
        self.__vector.store_to_file(bv_file)
        opened = False
        if isinstance(nt_file, str):
            nt_file = open(nt_file, 'w')
            opened = True
        nt_file.write(self.__nucleotides)
        if opened:
            nt_file.close()

    def load_from_file(self, bv_file, nt_file, vector_type):
        """
        Create a Succinct_column from the files produced by the store_to_file() function.

        Parameters:
            bv_file (str): The path to the file that will store the bitvector.
            nt_file (str or Reader): The file in which nucleotides will be read.
            vector_type (type): The type of vector to load.

        Post:
            The inner attributes have been altered according to the content of the files
        """
        self.__vector = vector_type.load_from_file(bv_file)
        self.__init_rank()
        nb_nt = self.__rank.rank(len(self.__vector) - 1) + self.__vector[-1]
        if isinstance(nt_file, str):
            nt_file = open(nt_file)
        self.__nucleotides = nt_file.read(nb_nt)

__init__(bitvector=None, nt_kept=None, vector=None)

Build a SDVector or a BitVector and a sequence of nucleotides (corresponding to the "1" in the bit sequence) from all the nucleotides in a column.

Parameters:

Name Type Description Default
bitvector BitVector

A bit vector corresponding to a simplified version of multiple alignment.

None
nt_kept str

Nucleotides corresponding to the '1' in the bit vector.

None
vector type

Selection of the class representing the bit vector.

None
Source code in succinct_column.py
def __init__(self, bitvector=None, nt_kept=None, vector=None):
    """
    Build a SDVector or a BitVector and a sequence of nucleotides (corresponding to the "1" in the bit sequence) from all the 
    nucleotides in a column.

    Parameters:
        bitvector (pysdsl.BitVector): A bit vector corresponding to a simplified version of multiple alignment.
        nt_kept (str): Nucleotides corresponding to the '1' in the bit vector.
        vector (type): Selection of the class representing the bit vector.
    """
    self.__type = vector
    self.__nucleotides = nt_kept
    if bitvector:
        if not vector:
            self.__vector = bitvector
        else:
            self.__vector = vector(bitvector)
        self.__init_rank()

__init_rank()

Init rank/select data structures

Source code in succinct_column.py
def __init_rank(self):
    '''
    Init rank/select data structures
    '''
    self.__rank = self.__vector.init_rank_1()
    self.__select = self.__vector.init_select_1()

get_kept_nucleotides()

Returns the nucleotides used to deduce the column's sequence from the bit vector.

Returns:

Type Description
str

The nucleotides kept.

Source code in succinct_column.py
def get_kept_nucleotides(self):
    """
    Returns the nucleotides used to deduce the column's sequence from the bit vector.

    Returns:
        (str): The nucleotides kept.
    """
    return self.__nucleotides

get_nb_changes()

Returns the number of changes in the vector

Returns:

Type Description
int

The number of positions with a 1 in the bit vector

Source code in succinct_column.py
def get_nb_changes(self):
    '''
    Returns the number of changes in the vector

    Returns:
        (int): The number of positions with a 1 in the bit vector
    '''
    return len(self.get_kept_nucleotides())

get_nt(position)

Returns the nucleotide at the position specified in the column (the p-th sequence in the alignment).

Parameters:

Name Type Description Default
position int

The position of the nucleotide in the column.

required

Returns:

Type Description
str

The target nucleotide.

Source code in succinct_column.py
def get_nt(self, position):
    """ 
    Returns the nucleotide at the position specified in the column (the p-th sequence in the alignment).

    Parameters:
        position (int): The position of the nucleotide in the column.

    Returns:
        (str): The target nucleotide.
    """
    length_vector = len(self.__vector)
    if position == length_vector - 1:
        nt = self.__nucleotides[-1]
    else:
        nt = self.__nucleotides[self.__rank(position + 1) - 1]
    return nt

get_pos_of_ones()

Return the positions of the ones in the bit vector

Returns:

Type Description
set

The set of the positions where the value in the bit vector is 1

Source code in succinct_column.py
def get_pos_of_ones(self):
    '''
    Return the positions of the ones in the bit vector

    Returns:
        (set): The set of the positions where the value in the bit vector is  1
    '''
    pos_ones = set()
    pos_ones.add(0)
    nb_ones = 1
    pos=0

    while nb_ones < self.get_nb_changes():
        if nb_ones+1 < len(self):
            pos = self.__select(nb_ones+1)
        else:
            # Bug with select: when all the bits are at 1 we can't query the last
            pos = len(self)-1
        pos_ones.add(pos)
        nb_ones += 1

    return pos_ones

get_vector()

Returns the SDVector object corresponding to the compacted representation of the bit vector.

Returns:

Type Description
vector

The object (whose type corresponds to self.get_vector_type()) corresponding to the compacted representation of the bit vector.

Source code in succinct_column.py
def get_vector(self):
    """
    Returns the SDVector object corresponding to the compacted representation of the bit vector.

    Returns:
        (vector):
            The object (whose type corresponds to self.get_vector_type()) corresponding to the compacted representation of the bit vector.
    """
    return self.__vector

get_vector_type()

Returns the type of the vector returned by self.get_vector().

Returns:

Type Description
type

The type of the bit vector

Source code in succinct_column.py
def get_vector_type(self):
    '''
    Returns the type of the vector returned by self.get_vector().

    Returns:
        (type): The type of the bit vector
    '''
    return self.__type

load(path, nt_file, vector_type) staticmethod

Parameters:

Name Type Description Default
path str

Path to the file storing the bitvector of the SuccinctColumn.

required
nt_file str or Reader

The file in which nucleotides will be written.

required
vector_type type

The type of the vector.

required

Returns:

Type Description
SuccinctColumn

A column loaded from the file

Source code in succinct_column.py
@staticmethod
def load(path, nt_file, vector_type):
    '''
    Parameters:
        path (str): Path to the file storing the bitvector of the SuccinctColumn.
        nt_file (str or Reader): The file in which nucleotides will be written.
        vector_type (type): The type of the vector.

    Returns:
        (SuccinctColumn): A column loaded from the file
    '''

    column = SuccinctColumn()
    column.__type = vector_type
    column.load_from_file(path, nt_file, vector_type)
    return column

load_from_file(bv_file, nt_file, vector_type)

Create a Succinct_column from the files produced by the store_to_file() function.

Parameters:

Name Type Description Default
bv_file str

The path to the file that will store the bitvector.

required
nt_file str or Reader

The file in which nucleotides will be read.

required
vector_type type

The type of vector to load.

required
Post

The inner attributes have been altered according to the content of the files

Source code in succinct_column.py
def load_from_file(self, bv_file, nt_file, vector_type):
    """
    Create a Succinct_column from the files produced by the store_to_file() function.

    Parameters:
        bv_file (str): The path to the file that will store the bitvector.
        nt_file (str or Reader): The file in which nucleotides will be read.
        vector_type (type): The type of vector to load.

    Post:
        The inner attributes have been altered according to the content of the files
    """
    self.__vector = vector_type.load_from_file(bv_file)
    self.__init_rank()
    nb_nt = self.__rank.rank(len(self.__vector) - 1) + self.__vector[-1]
    if isinstance(nt_file, str):
        nt_file = open(nt_file)
    self.__nucleotides = nt_file.read(nb_nt)

nt_counts()

Returns the counts of each nucleotide in the column.

Returns:

Type Description
dict

the counts of each nucleotide/symbol existing in the column

Source code in succinct_column.py
def nt_counts(self):
    """
    Returns the counts of each nucleotide in the column.

    Returns:
        (dict): the counts of each nucleotide/symbol existing in the column
    """
    nt_pos_dict = self.nt_pos()

    return { nt : sum([x[1]-x[0]+1 for x in poslist]) for nt, poslist in nt_pos_dict.items() }

nt_frequency()

Returns the percentage of each nucleotide in the column.

Returns:

Type Description
dict

the percentage of each nucleotide/symbol existing in the column

Source code in succinct_column.py
def nt_frequency(self):
    """
    Returns the percentage of each nucleotide in the column.

    Returns:
        (dict): the percentage of each nucleotide/symbol existing in the column
    """
    nt_count_dict = self.nt_counts()
    return {nt : count*1. / len(self.__vector) for nt, count in nt_count_dict.items()}

nt_pos()

Get the start and end positions of each run of nucleotides

Returns:

Type Description
dict

keys are existing nucleotides and values are a list of tuples with start and end positions (inclusive)

Source code in succinct_column.py
def nt_pos(self):
    '''
    Get the start and end positions of each run of nucleotides

    Returns:
        (dict): keys are existing nucleotides and values are a list of tuples with start and end positions (inclusive)
    '''
    nt_pos_dict = defaultdict(list)
    nb_ones = 1
    previous_pos = 0
    pos=0

    while nb_ones < self.get_nb_changes():
        if nb_ones+1 < len(self):
            pos = self.__select(nb_ones+1)
        else:
            # Bug with select: when all the bits are at 1 we can't query the last
            pos = len(self)-1
        nt_pos_dict[self.__nucleotides[nb_ones-1]].append((previous_pos, pos-1))
        nb_ones += 1
        previous_pos = pos
    length_vector = len(self.__vector)

    nt_pos_dict[self.__nucleotides[-1]].append((pos, length_vector-1))
    return nt_pos_dict

size_in_bytes()

Return the size in bytes of the pysdsl vector representing the column of nucleotides.

Returns:

Type Description
int

The size in bytes of the pysdsl vector representing the column of nucleotides

Source code in succinct_column.py
def size_in_bytes(self):
    """
    Return the size in bytes of the pysdsl vector representing the column of nucleotides.

    Returns:
        (int): The size in bytes of the pysdsl vector representing the column of nucleotides

    """
    return self.__vector.size_in_bytes + len(self.__nucleotides)

store_to_file(bv_file, nt_file)

Store the SDVector and the nucleotides in two files. Do not use if the bit vector is represented by a pysdsl.BitVector.

Parameters:

Name Type Description Default
bv_file str

The path to the file that will store the bitvector.

required
nt_file str or Writer

The file in which nucleotides will be written.

required
Source code in succinct_column.py
def store_to_file(self, bv_file, nt_file):
    """
    Store the SDVector and the nucleotides in two files.
    Do not use if the bit vector is represented by a pysdsl.BitVector.

    Parameters:
        bv_file (str): The path to the file that will store the bitvector.
        nt_file (str or Writer): The file in which nucleotides will be written.
    """
    self.__vector.store_to_file(bv_file)
    opened = False
    if isinstance(nt_file, str):
        nt_file = open(nt_file, 'w')
        opened = True
    nt_file.write(self.__nucleotides)
    if opened:
        nt_file.close()