import os import pydicom as dicom import argparse import pathlib import urllib.parse import datalad.api as dlad import shutil import gitlab GITLAB_REMOTE_NAME = os.environ.get("GITLAB_REMOTE_NAME", "gitlab") def sort_series(path: pathlib.Path) -> None: """Sort series in separate folder Parameters ---------- path : str path to dicoms """ files = path.glob(os.path.join(path, "*")) for f in files: if not os.path.isfile(f): continue dic = dicom.read_file(f, stop_before_pixels=True) # series_number = dic.SeriesNumber series_instance_uid = dic.SeriesInstanceUID subpath = os.path.join(path, series_instance_uid) if not os.path.exists(subpath): os.mkdir(subpath) os.rename(f, os.path.join(subpath, os.path.basename(f))) def _build_arg_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description="dicom_indexer - indexes dicoms into datalad" ) p.add_argument("input", help="path/url of the dicom.") p.add_argument( "--gitlab-url", type=str, default=os.environ.get("GITLAB_SERVER", None), help="http(s) url to the gitlab server where to push repos", ) p.add_argument( "--gitlab-group-template", default="{ReferringPhysicianName}/{StudyDescription.replace(" ^ "," / ")}", type=str, help="string with placeholder for dicom tags", ) p.add_argument( "--session-name-tag", default="PatientName", type=str, help="dicom tags that contains the name of the session", ) p.add_argument("--storage-remote", help="url to the datalad remote") p.add_argument( "--sort-series", action="store_true", type=bool, default=True, help="sort dicom series in separate folders", ) p.add_argument( "--fake-dates", type=bool, action="store_true", help="use fake dates for datalad dataset", ) p.add_argument( "--p7z-opts", type=str, default="-mx5 -ms=off", help="option for 7z generated archives", ) return p def main() -> None: parser = _build_arg_parser() args = parser.parse_args() input = urllib.parse.urlparse(args.input) output_remote = urllib.parse.urlparse(args.storage_remote) gitlab_url = urllib.parse.urlparse(args.gitlab_url) with index_dicoms( input, sort_series=p.sort_series, fake_dates=p.fake_dates, p7z_opts=p.p7z_opts, gitlab_group_template=args.gitlab_group_template, ) as dicom_session_ds: session_metas = extract_session_metas(dicom_session_ds) if not input.scheme or input.scheme == "file" or args.force_export: export_data(dicom_session_ds, output_remote, session_metas) setup_gitlab_remote( dicom_session_ds, gitlab_url=gitlab_url, dicom_session_name=args.session_name_tag, session_metas=session_meta, ) def index_dicoms( input: urllib.parse.ParseResult, sort_series: bool, fake_dates: bool, p7z_opts: str, ) -> dlad.Dataset: """Process incoming dicoms into datalad repo""" with tempfile.TemporaryDirectory() as tmpdirname: dicom_session_ds = dlad.create(tmpdirname, fake_dates=fake_dates) if not input.scheme or input.scheme == "file": dest = import_local_data( dicom_session_ds, pathlib.Path(input.path), sort_series=sort_series, p7z_opts=p7z_opts, ) elif input.scheme in ["http", "https", "s3"]: dest = import_remote_data(dicom_session_ds, input_url) # index dicoms files dicom_session_ds.add_archive_content( dest, strip_leading_dirs=True, commit=False, ) # cannot pass message above so commit now dicom_session_ds.save(message="index dicoms from archive") # # optimize git index after large import dicom_session_ds.repo.gc() # aggressive by default yield dicom_session_ds def export_data( dicom_session_ds: dlad.Dataset, output_remote: urllib.parse.ParseResult, session_metas: dict, ): if output_remote.scheme == "ria": export_to_ria(dicom_session_ds, output_remote, session_metas) elif output_remote.scheme == "s3": export_to_s3(dicom_session_ds, output_remote, session_metas) def setup_gitlab_repos( dicom_session_ds: dlad.Dataset, gitlab_url: urllib.parse.ParseResult, gitlab_group_path: str, session_metas: dict, ) -> None: gitlab_conn = connect_gitlab(gitlab_url) gitlab_group_path = gitlab_group_template.format(session_metas) dicom_sourcedata_path = "/".join([dicom_session_path, "sourcedata/dicoms"]) dicom_session_path = "/".join( [dicom_sourcedata_path, session_metas["StudyInstanceUID"]] ) dicom_study_path = "/".join([dicom_sourcedata_path, "study"]) dicom_session_repo = get_or_create_gitlab_project(gl, dicom_session_path) ds.siblings( action="configure", # allow to overwrite existing config name=GITLAB_REMOTE_NAME, url=dicom_session_repo._attrs["ssh_url_to_repo"], ) ds.push(to=GITLAB_REMOTE_NAME) study_group = get_or_create_group(gl, gitlab_group_path) bot_user = gl.users.list(username=GITLAB_BOT_USERNAME)[0] study_group.members.create( { "user_id": bot_user.id, "access_level": gitlab.const.AccessLevel.MAINTAINER, } ) ## add the session to the dicom study repo dicom_study_repo = get_or_create_project(gl, dicom_study_path) with tempfile.TemporaryDirectory() as tmpdir: dicom_study_ds = datalad.api.install( source=dicom_study_repo._attrs["ssh_url_to_repo"], path=tmpdir, ) if dicom_study_ds.repo.get_hexsha() is None or dicom_study_ds.id is None: dicom_study_ds.create(force=True) dicom_study_ds.push(to="origin") # add default study DS structure. init_dicom_study(dicom_study_ds, gitlab_group_path) # initialize BIDS project init_bids(gl, dicom_study_repo, gitlab_group_path) # create subgroup for QC and derivatives repos create_group(gl, f"{gitlab_group_path}/derivatives") create_group(gl, f"{gitlab_group_path}/qc") dicom_study_ds.install( source=dicom_session_repo._attrs["ssh_url_to_repo"], path=session_meta["PatientName"], ) # Push to gitlab + local ria-store dicom_study_ds.push(to="origin") dicom_study_ds.push(to=UNF_DICOMS_RIA_NAME) def init_bids( gl: gitlab.Gitlab, dicom_study_repo: dlad.Dataset, gitlab_group_path: str, ) -> None: bids_project_repo = create_project(gl, f"{gitlab_group_path}/bids") with tempfile.TemporaryDirectory() as tmpdir: bids_project_ds = datalad.api.install( source=bids_project_repo._attrs["ssh_url_to_repo"], path=tmpdir, ) bids_project_ds.create(force=True) shutil.copytree("repo_templates/bids", bids_project_ds.path, dirs_exist_ok=True) bids_project_ds.save(path=".", message="init structure and pipelines") bids_project_ds.install( path="sourcedata/dicoms", source=dicom_study_repo._attrs["ssh_url_to_repo"], ) # TODO: setup sensitive / non-sensitive S3 buckets bids_project_ds.push(to="origin") # create dev branch and push for merge requests bids_project_ds.gitrepo.checkout(BIDS_DEV_BRANCH, ["-b"]) bids_project_ds.push(to="origin") bids_project_ds.protectedbranches.create(data={"name": "convert/*"}) bids_project_ds.protectedbranches.create(data={"name": "dev"}) def init_dicom_study( dicom_study_ds: dlad.Dataset, gitlab_group_path: str, ) -> None: shutil.copytree( "repo_templates/dicom_study", dicom_study_ds.path, dirs_exist_ok=True ) env = { "variables": { "STUDY_PATH": gitlab_group_path, "BIDS_PATH": f"{gitlab_group_path}/bids", } } with open(os.path.join(dicom_study_ds.path, "ci-env.yml"), "w") as outfile: yaml.dump(env, outfile, default_flow_style=False) dicom_study_ds.save(path=".", message="init structure and pipelines") dicom_study_ds.push(to="origin") SESSION_META_KEYS = [ "StudyInstanceUID", "PatientID", "PatientName", "ReferringPhysicianName", "StudyDate", "StudyDescription", ] def extract_session_metas(dicom_session_ds: dlad.Dataset) -> dict: all_files = dicom_session_ds.repo.find("*") for f in all_files: try: dic = dicom.read_file(f, stop_before_pixels=True) except Exception: # TODO: what exception occurs when non-dicom ? continue # return at first dicom found return {k: getattr(dic, k) for k in SESSION_META_KEYS} def import_local_data( dicom_session_ds: dlad.Dataset, input_path: pathlib.Path, sort_series: bool = True, p7z_opts: str = "-mx5", ): dest = input_path.basename() if input_path.is_dir(): dest = dest + ".7z" # create 7z archive with 1block/file parameters subprocess.run( ["7z", "u", str(dest), "."] + p7z_opts, cwd=str(dicom_session_ds.path), ) elif input_path.is_file(): dest = dicom_session_ds.path / dest try: # try hard-linking to avoid copying os.link(str(input_path), str(dest)) except OSError: # fallback if hard-linking not supported shutil.copyfile(str(input_path), str(dest)) dicom_session_ds.save(dest, message="add dicoms archive") return dest def import_remote_data( dicom_session_ds: dlad.Dataset, input_url: urllib.parse.ParseResult ): try: dest = pathlib.Path(url.path).basename dicom_session_ds.repo.add_url_to_file(dest, url) except Exception: ... # TODO: check how things can fail here and deal with it. return dest def export_to_ria( ds: dlad.Dataset, ria_url: urllib.parse.ParseResult, session_metas: dict, ): ria_name = pathlib.Path(ria_url.path).basename ds.create_sibling_ria( ria_url, name=ria_name, alias=session_meta["PatientID"], existing="reconfigure" ) ds.push(to=ria_name, data="nothing") ria_sibling_path = pathlib.Path(ds.siblings(name=ria_name)[0]["url"]) archive_path = ria_sibling_path / "archives" / "archive.7z" ds.export_archive_ora( archive_path, opts=[f"-mx{COMPRESSION_LEVEL}"], missing_content="error" ) ds.repo.fsck(remote=f"{ria_url}-storage", fast=True) # index ds.push(to=ria_name, data="nothing") def export_to_s3( ds: dlad.Dataset, s3_url: urllib.parse.ParseResult, session_metas: dict, ): ds.repo.initremote() # git-annex initremote remotename ... # git-annex wanted remotename include=**.{7z,tar.gz,zip} # datalad push --data auto --to remotename def connect_gitlab( gitlab_url: urllib.parse.ParseResult, debug: bool = False ) -> gitlab.Gitlab: """ Connection to Gitlab """ gl = gitlab.Gitlab(str(gitlab_url), private_token=GITLAB_TOKEN) if debug: gl.enable_debug() gl.auth() return gl def get_or_create_gitlab_group( gl: gitlab.Gitlab, group_path: str, ): """fetch or create a gitlab group""" group_list = group.split("/") found = False for keep_groups in reversed(range(len(group_list) + 1)): tmp_repo_path = "/".join(group_list[0:keep_groups]) logging.warning(tmp_repo_path) gs = gl.groups.list(search=tmp_repo_path) for g in gs: if g.full_path == tmp_repo_path: found = True break if found: break for nb_groups in range(keep_groups, len(group_list)): if nb_groups == 0: msg = "Creating group {}".format(group_list[nb_groups]) logging.warning(msg) logging.warning(len(msg) * "=") g = gl.groups.create( {"name": group_list[nb_groups], "path": group_list[nb_groups]} ) else: msg = "Creating group {} from {}".format(group_list[nb_groups], g.name) logging.warning(msg) logging.warning(len(msg) * "=") g = gl.groups.create( { "name": group_list[nb_groups], "path": group_list[nb_groups], "parent_id": g.id, } ) return g def get_or_create_gitlab_project(gl: gitlab.Gitlab, project_path: str): """fetch or create a gitlab repo""" project_name = project_path.split("/") if len(project_name) == 1: # Check if exists p = gl.projects.list(search=project_name[0]) if not p: p = gl.projects.create({"name": project_name[0], "path": project_name[0]}) return p.id else: return p[0].id # Look for exact repo/project: p = gl.projects.list(search=project_name[-1]) if p: for curr_p in p: if curr_p.path_with_namespace == project_path: return curr_p g = get_or_create_gitlab_group(gl, project_name[:-1]) p = gl.projects.create({"name": project_name[-1], "namespace_id": g.id}) return p