import json
import warnings
import numpy as np
from pydantic import DirectoryPath
from ..baserecordingextractorinterface import BaseRecordingExtractorInterface
from ..basesortingextractorinterface import BaseSortingExtractorInterface
from ....utils import DeepDict, dict_deep_update
[docs]
class NeuralynxRecordingInterface(BaseRecordingExtractorInterface):
"""
Primary data interface for converting Neuralynx data.
Uses :py:func:`~spikeinterface.extractors.read_neuralynx` from SpikeInterface.
"""
display_name = "Neuralynx Recording"
associated_suffixes = (".ncs", ".nse", ".ntt", ".nse", ".nev")
info = "Interface for Neuralynx recording data."
[docs]
@classmethod
def get_stream_names(cls, folder_path: DirectoryPath) -> list[str]:
from spikeinterface.extractors.extractor_classes import (
NeuralynxRecordingExtractor,
)
stream_names, _ = NeuralynxRecordingExtractor.get_streams(folder_path=folder_path)
return stream_names
[docs]
@classmethod
def get_source_schema(cls) -> dict:
source_schema = super().get_source_schema()
source_schema["properties"]["folder_path"][
"description"
] = 'Path to Neuralynx directory containing ".ncs", ".nse", ".ntt", ".nse", or ".nev" files.'
return source_schema
def __init__(
self,
folder_path: DirectoryPath,
*args, # TODO: change to * (keyword only) on or after August 2026
stream_name: str | None = None,
verbose: bool = False,
es_key: str = "ElectricalSeries",
):
"""
Initialize reading of OpenEphys binary recording.
Parameters
----------
folder_path: DirectoryPath
Path to Neuralynx directory.
stream_name : str, optional
The name of the recording stream to load; only required if there is more than one stream detected.
Call `NeuralynxRecordingInterface.get_stream_names(folder_path=...)` to see what streams are available.
verbose : bool, default: False
es_key : str, default: "ElectricalSeries"
"""
# Handle deprecated positional arguments
if args:
parameter_names = [
"stream_name",
"verbose",
"es_key",
]
num_positional_args_before_args = 1 # folder_path
if len(args) > len(parameter_names):
raise TypeError(
f"__init__() takes at most {len(parameter_names) + num_positional_args_before_args + 1} positional arguments but "
f"{len(args) + num_positional_args_before_args + 1} were given. "
"Note: Positional arguments are deprecated and will be removed on or after August 2026. "
"Please use keyword arguments."
)
positional_values = dict(zip(parameter_names, args))
passed_as_positional = list(positional_values.keys())
warnings.warn(
f"Passing arguments positionally to NeuralynxRecordingInterface.__init__() is deprecated "
f"and will be removed on or after August 2026. "
f"The following arguments were passed positionally: {passed_as_positional}. "
"Please use keyword arguments instead.",
FutureWarning,
stacklevel=2,
)
stream_name = positional_values.get("stream_name", stream_name)
verbose = positional_values.get("verbose", verbose)
es_key = positional_values.get("es_key", es_key)
super().__init__(
folder_path=folder_path,
stream_name=stream_name,
verbose=verbose,
es_key=es_key,
)
# convert properties of object dtype (e.g. datetime) and bool as these are not supported by nwb
for key in self.recording_extractor.get_property_keys():
value = self.recording_extractor.get_property(key)
if value.dtype == object or value.dtype == np.bool_:
self.recording_extractor.set_property(key, np.asarray(value, dtype=str))
[docs]
class NeuralynxSortingInterface(BaseSortingExtractorInterface):
"""
Primary data interface for converting Neuralynx sorting data. Uses
:py:func:`~spikeinterface.extractors.read_neuralynx_sorting`.
"""
display_name = "Neuralynx Sorting"
associated_suffixes = (".nse", ".ntt", ".nse", ".nev")
info = "Interface for Neuralynx sorting data."
def __init__(
self,
folder_path: DirectoryPath,
*args, # TODO: change to * (keyword only) on or after August 2026
sampling_frequency: float | None = None,
verbose: bool = False,
stream_id: str | None = None,
):
"""_summary_
Parameters
----------
folder_path : str, Path
The path to the folder/directory containing the data files for the session (nse, ntt, nse, nev)
sampling_frequency : float, optional
If a specific sampling_frequency is desired it can be set with this argument.
verbose : bool, default: False
Enables verbosity
stream_id: str, optional
Used by Spikeinterface and neo to calculate the t_start, if not provided and the stream is unique
it will be chosen automatically
"""
# Handle deprecated positional arguments
if args:
parameter_names = [
"sampling_frequency",
"verbose",
"stream_id",
]
num_positional_args_before_args = 1 # folder_path
if len(args) > len(parameter_names):
raise TypeError(
f"__init__() takes at most {len(parameter_names) + num_positional_args_before_args + 1} positional arguments but "
f"{len(args) + num_positional_args_before_args + 1} were given. "
"Note: Positional arguments are deprecated and will be removed on or after August 2026. "
"Please use keyword arguments."
)
positional_values = dict(zip(parameter_names, args))
passed_as_positional = list(positional_values.keys())
warnings.warn(
f"Passing arguments positionally to NeuralynxSortingInterface.__init__() is deprecated "
f"and will be removed on or after August 2026. "
f"The following arguments were passed positionally: {passed_as_positional}. "
"Please use keyword arguments instead.",
FutureWarning,
stacklevel=2,
)
sampling_frequency = positional_values.get("sampling_frequency", sampling_frequency)
verbose = positional_values.get("verbose", verbose)
stream_id = positional_values.get("stream_id", stream_id)
super().__init__(
folder_path=folder_path, sampling_frequency=sampling_frequency, stream_id=stream_id, verbose=verbose
)
def _dict_intersection(dict_list: list[dict]) -> dict:
"""
Intersect dict_list and return only common keys and values
Parameters
----------
dict_list: list of dictionaries each representing a header
Returns
-------
dict:
Dictionary containing key-value pairs common to all input dictionary_list
"""
# Collect keys appearing in all dictionaries
common_keys = list(set.intersection(*[set(h.keys()) for h in dict_list]))
# Add values for common keys if the value is identical across all headers (dict_list)
first_dict = dict_list[0]
all_dicts_have_same_value_for = lambda key: all([first_dict[key] == dict[key] for dict in dict_list])
common_header = {key: first_dict[key] for key in common_keys if all_dicts_have_same_value_for(key)}
return common_header