import json
import warnings
from datetime import datetime, timedelta
from pathlib import Path
from warnings import warn
from pydantic import FilePath, validate_call
from ..baseicephysinterface import BaseIcephysInterface
from ....utils import DeepDict
[docs]
def get_start_datetime(neo_reader):
"""
Get start datetime for Abf file.
Parameters
----------
neo_reader : neo.io.AxonIO
The Neo reader object for the ABF file.
Returns
-------
datetime
The start date and time of the recording.
"""
if all(k in neo_reader._axon_info for k in ["uFileStartDate", "uFileStartTimeMS"]):
startDate = str(neo_reader._axon_info["uFileStartDate"])
startTime = round(neo_reader._axon_info["uFileStartTimeMS"] / 1000)
startDate = datetime.strptime(startDate, "%Y%m%d")
startTime = timedelta(seconds=startTime)
return startDate + startTime
else:
warn(
f"uFileStartDate or uFileStartTimeMS not found in {neo_reader.filename.split('/')[-1]}, datetime for "
"recordings might be wrongly stored."
)
return neo_reader._axon_info["rec_datetime"]
[docs]
class AbfInterface(BaseIcephysInterface):
"""Interface for ABF intracellular electrophysiology data."""
display_name = "ABF Icephys"
associated_suffixes = (".abf",)
info = "Interface for ABF intracellular electrophysiology data."
def _initialize_extractor(self, interface_kwargs: dict):
self.extractor_kwargs = {}
if "filename" in interface_kwargs:
self.extractor_kwargs["filename"] = interface_kwargs["filename"]
elif "file_paths" in interface_kwargs and interface_kwargs["file_paths"]:
self.extractor_kwargs["filename"] = interface_kwargs["file_paths"][0]
extractor_class = self.get_extractor_class()
extractor_instance = extractor_class(**self.extractor_kwargs)
return extractor_instance
[docs]
@classmethod
def get_source_schema(cls) -> dict:
source_schema = super().get_source_schema()
source_schema["properties"]["file_paths"] = dict(
type="array",
minItems=1,
items={"type": "string", "format": "file"},
description="Array of paths to ABF files.",
)
source_schema["properties"]["icephys_metadata"] = dict(
type="object", description="Metadata for this experiment."
)
source_schema["properties"]["icephys_metadata_file_path"] = dict(
type="string", format="file", description="Path to JSON file containing metadata for this experiment."
)
return source_schema
@validate_call
def __init__(
self,
file_paths: list[FilePath],
*args, # TODO: change to * (keyword only) on or after August 2026
icephys_metadata: dict | None = None,
icephys_metadata_file_path: FilePath | None = None,
):
"""
ABF IcephysInterface based on Neo AxonIO.
Parameters
----------
file_paths : list of FilePaths
List of files to be converted to the same NWB file.
icephys_metadata : dict, optional
Dictionary containing the Icephys-specific metadata.
icephys_metadata_file_path : FilePath, optional
JSON file containing the Icephys-specific metadata.
"""
# Handle deprecated positional arguments
if args:
parameter_names = [
"icephys_metadata",
"icephys_metadata_file_path",
]
num_positional_args_before_args = 1 # file_paths
if len(args) > len(parameter_names):
raise TypeError(
f"__init__() takes at most {len(parameter_names) + num_positional_args_before_args + 1} positional arguments but "
f"{len(args) + num_positional_args_before_args + 1} were given. "
"Note: Positional arguments are deprecated and will be removed on or after August 2026. "
"Please use keyword arguments."
)
positional_values = dict(zip(parameter_names, args))
passed_as_positional = list(positional_values.keys())
warnings.warn(
f"Passing arguments positionally to AbfInterface.__init__() is deprecated "
f"and will be removed on or after August 2026. "
f"The following arguments were passed positionally: {passed_as_positional}. "
"Please use keyword arguments instead.",
FutureWarning,
stacklevel=2,
)
icephys_metadata = positional_values.get("icephys_metadata", icephys_metadata)
icephys_metadata_file_path = positional_values.get("icephys_metadata_file_path", icephys_metadata_file_path)
super().__init__(file_paths=file_paths)
self.source_data.update(
icephys_metadata=icephys_metadata,
icephys_metadata_file_path=icephys_metadata_file_path,
)
[docs]
def set_aligned_starting_time(self, aligned_starting_time: float):
for reader in self.readers_list:
number_of_segments = reader.header["nb_segment"][0]
for segment_index in range(number_of_segments):
reader._t_starts[segment_index] += aligned_starting_time
[docs]
def set_aligned_segment_starting_times(
self, aligned_segment_starting_times: list[list[float]], stub_test: bool = False
):
"""
Align the individual starting time for each video in this interface relative to the common session start time.
Must be in units seconds relative to the common 'session_start_time'.
Parameters
----------
aligned_segment_starting_times : list of list of floats
The relative starting times of each video.
Outer list is over file paths (readers).
Inner list is over segments of each recording.
stub_test : bool, default=False
"""
number_of_files_from_starting_times = len(aligned_segment_starting_times)
assert number_of_files_from_starting_times == len(self.readers_list), (
f"The length of the outer list of 'starting_times' ({number_of_files_from_starting_times}) "
"does not match the number of files ({len(self.readers_list)})!"
)
for file_index, (reader, aligned_segment_starting_times_by_file) in enumerate(
zip(self.readers_list, aligned_segment_starting_times)
):
number_of_segments = reader.header["nb_segment"][0]
assert number_of_segments == len(
aligned_segment_starting_times_by_file
), f"The length of starting times index {file_index} does not match the number of segments of that reader!"
reader._t_starts = aligned_segment_starting_times_by_file