Source code for neuroconv.datainterfaces.ecephys.spikeglx.spikeglxconverter
from pathlib import Path
from pydantic import DirectoryPath, validate_call
from .spikeglxdatainterface import SpikeGLXRecordingInterface
from .spikeglxnidqinterface import SpikeGLXNIDQInterface
from .spikeglxsyncchannelinterface import SpikeGLXSyncChannelInterface
from ....nwbconverter import ConverterPipe
from ....utils import get_json_schema_from_method_signature
[docs]
class SpikeGLXConverterPipe(ConverterPipe):
"""
The simplest, easiest to use class for converting all SpikeGLX data in a folder.
Primary conversion class for handling multiple SpikeGLX data streams.
"""
display_name = "SpikeGLX Converter"
keywords = (
SpikeGLXRecordingInterface.keywords + SpikeGLXNIDQInterface.keywords + SpikeGLXSyncChannelInterface.keywords
)
associated_suffixes = (
SpikeGLXRecordingInterface.associated_suffixes
+ SpikeGLXNIDQInterface.associated_suffixes
+ SpikeGLXSyncChannelInterface.associated_suffixes
)
info = "Converter for multi-stream SpikeGLX recording data."
[docs]
@classmethod
def get_source_schema(cls) -> dict:
"""
Get the schema for the source arguments.
Returns
-------
dict
The schema dictionary containing input parameters and descriptions
for initializing the SpikeGLX converter.
"""
source_schema = get_json_schema_from_method_signature(method=cls.__init__, exclude=["streams"])
source_schema["properties"]["folder_path"]["description"] = "Path to the folder containing SpikeGLX streams."
return source_schema
[docs]
@classmethod
def get_streams(cls, folder_path: DirectoryPath) -> list[str]:
"""
Get the stream IDs available in the folder.
Parameters
----------
folder_path : DirectoryPath
Path to the folder containing SpikeGLX streams.
Returns
-------
list of str
The IDs of all available streams in the folder.
"""
from spikeinterface.extractors.extractor_classes import (
SpikeGLXRecordingExtractor,
)
# The first entry is the stream ids the second is the stream names
return SpikeGLXRecordingExtractor.get_streams(folder_path=folder_path)[0]
@validate_call
def __init__(
self,
folder_path: DirectoryPath,
streams: list[str] | None = None,
verbose: bool = False,
):
"""
Read all data from multiple streams stored in the SpikeGLX format.
This can include...
(a) single-probe but multi-band such as AP+LF streams
(b) multi-probe with multi-band
(c) with or without the associated NIDQ channels
(d) synchronization channels from Neuropixel probes (one per probe, preferring AP over LF)
Parameters
----------
folder_path : DirectoryPath
Path to folder containing the NIDQ stream and subfolders containing each IMEC stream.
streams : list of strings, optional
A specific list of streams you wish to load.
To see which streams are available, run `SpikeGLXConverter.get_streams(folder_path="path/to/spikeglx")`.
By default, all available streams are loaded, including sync channels (one per probe).
To exclude sync channels, explicitly pass a list of streams without the '-SYNC' streams.
verbose : bool, default: False
Whether to output verbose text.
"""
folder_path = Path(folder_path)
streams_ids = streams or self.get_streams(folder_path=folder_path)
# Store stream IDs as private attribute for external access
self._stream_ids = streams_ids
data_interfaces = dict()
nidq_streams = [stream_id for stream_id in streams_ids if stream_id == "nidq"]
sync_streams = [stream_id for stream_id in streams_ids if "SYNC" in stream_id]
stream_is_neural = lambda stream_id: stream_id not in nidq_streams and stream_id not in sync_streams
neural_streams = [stream_id for stream_id in streams_ids if stream_is_neural(stream_id)]
# Add neural stream interfaces
for stream_id in neural_streams:
data_interfaces[stream_id] = SpikeGLXRecordingInterface(folder_path=folder_path, stream_id=stream_id)
# Add NIDQ interface
for stream_id in nidq_streams:
data_interfaces[stream_id] = SpikeGLXNIDQInterface(folder_path=folder_path)
# Add sync channel interfaces (one per probe, preferring AP over LF)
# Group sync streams by probe device (e.g., "imec0", "imec1")
probe_sync_map = {}
for stream_id in sync_streams:
if "imec" in stream_id:
# Extract probe device: "imec0.ap-SYNC" -> "imec0"
probe_device = stream_id.replace("-SYNC", "").split(".")[0]
if probe_device not in probe_sync_map:
probe_sync_map[probe_device] = []
probe_sync_map[probe_device].append(stream_id)
# For each probe, select one sync stream (prefer AP over LF via alphabetical sort)
for probe_device, sync_stream_list in probe_sync_map.items():
sync_stream_list.sort() # "ap" < "lf" alphabetically, so AP comes first
selected_sync_stream = sync_stream_list[0]
data_interfaces[selected_sync_stream] = SpikeGLXSyncChannelInterface(
folder_path=folder_path,
stream_id=selected_sync_stream,
metadata_key=f"SpikeGLXSync{selected_sync_stream}",
)
super().__init__(data_interfaces=data_interfaces, verbose=verbose)
[docs]
def get_conversion_options_schema(self) -> dict:
conversion_options_schema = super().get_conversion_options_schema()
conversion_options_schema["properties"].update(
{name: interface.get_conversion_options_schema() for name, interface in self.data_interface_objects.items()}
)
return conversion_options_schema