/usr/share/pyshared/neo/io/brainvisionio.py is in python-neo 0.3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | # -*- coding: utf-8 -*-
"""
Class for reading data from BrainVision product.
This code was originally made by L. Pezard (2010), modified B. Burle and S. More.
Supported : Read
Author: sgarcia
"""
import os
import re
import numpy as np
import quantities as pq
from neo.io.baseio import BaseIO
from neo.core import Segment, AnalogSignal, EventArray
from neo.io.tools import create_many_to_one_relationship
class BrainVisionIO(BaseIO):
"""
Class for reading/writing data from BrainVision product (brainAmp, brain analyser...)
Usage:
>>> from neo import io
>>> r = io.BrainVisionIO( filename = 'File_brainvision_1.eeg')
>>> seg = r.read_segment(lazy = False, cascade = True,)
"""
is_readable = True
is_writable = False
supported_objects = [Segment, AnalogSignal, EventArray]
readable_objects = [Segment]
writeable_objects = [ ]
has_header = False
is_streameable = False
read_params = { Segment : [ ] }
write_params = { Segment : [ ] }
name = None
extensions = ['vhdr']
mode = 'file'
def __init__(self , filename = None) :
"""
This class read/write a elan based file.
**Arguments**
filename : the filename to read or write
"""
BaseIO.__init__(self)
self.filename = filename
def read_segment(self, lazy = False, cascade = True):
## Read header file (vhdr)
header = readBrainSoup(self.filename)
assert header['Common Infos']['DataFormat'] == 'BINARY', NotImplementedError
assert header['Common Infos']['DataOrientation'] == 'MULTIPLEXED', NotImplementedError
nb_channel = int(header['Common Infos']['NumberOfChannels'])
sampling_rate = 1.e6/float(header['Common Infos']['SamplingInterval']) * pq.Hz
fmt = header['Binary Infos']['BinaryFormat']
fmts = { 'INT_16':np.int16, 'IEEE_FLOAT_32':np.float32,}
assert fmt in fmts, NotImplementedError
dt = fmts[fmt]
seg = Segment(file_origin = os.path.basename(self.filename), )
if not cascade : return seg
# read binary
if not lazy:
binary_file = os.path.splitext(self.filename)[0]+'.eeg'
sigs = np.memmap(binary_file , dt, 'r', ).astype('f')
n = int(sigs.size/nb_channel)
sigs = sigs[:n*nb_channel]
sigs = sigs.reshape(n, nb_channel)
for c in range(nb_channel):
name, ref, res, units = header['Channel Infos']['Ch%d' % (c+1,)].split(',')
units = pq.Quantity(1, units.replace('ยต', 'u') )
if lazy:
signal = [ ]*units
else:
signal = sigs[:,c]*units
anasig = AnalogSignal(signal = signal,
channel_index = c,
name = name,
sampling_rate = sampling_rate,
)
if lazy:
anasig.lazy_shape = -1
seg.analogsignals.append(anasig)
# read marker
marker_file = os.path.splitext(self.filename)[0]+'.vmrk'
all_info = readBrainSoup(marker_file)['Marker Infos']
all_types = [ ]
times = [ ]
labels = [ ]
for i in range(len(all_info)):
type_, label, pos, size, channel = all_info['Mk%d' % (i+1,)].split(',')[:5]
all_types.append(type_)
times.append(float(pos)/sampling_rate.magnitude)
labels.append(label)
all_types = np.array(all_types)
times = np.array(times) * pq.s
labels = np.array(labels, dtype = 'S')
for type_ in np.unique(all_types):
ind = type_ == all_types
if lazy:
ea = EventArray(name = str(type_))
ea.lazy_shape = -1
else:
ea = EventArray( times = times[ind],
labels = labels[ind],
name = str(type_),
)
seg.eventarrays.append(ea)
create_many_to_one_relationship(seg)
return seg
def readBrainSoup(filename):
section = None
all_info = { }
for line in open(filename , 'rU'):
line = line.strip('\n').strip('\r')
if line.startswith('['):
section = re.findall('\[([\S ]+)\]', line)[0]
all_info[section] = { }
continue
if line.startswith(';'):
continue
if '=' in line and len(line.split('=')) ==2:
k,v = line.split('=')
all_info[section][k] = v
return all_info
|