Skip to content

Data Manager Module

DataManager

Source code in yooink/data/data_manager.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class DataManager:
    def __init__(self) -> None:
        """Initializes the DataManager."""
        pass

    @staticmethod
    def process_file(catalog_file: str, use_dask: bool = False
                     ) -> xr.Dataset | None:
        """
        Download and process a NetCDF file into an xarray dataset.

        Args:
            catalog_file: URL or path to the NetCDF file.
            use_dask: Whether to use dask for processing (for large files).

        Returns:
            The xarray dataset.
        """
        try:
            # Convert the catalog file URL to the data URL
            tds_url = ('https://opendap.oceanobservatories.org/thredds/'
                       'fileServer/')
            data_url = re.sub(
                r'catalog.html\?dataset=', tds_url, catalog_file)

            # Download the dataset
            r = requests.get(data_url, timeout=(3.05, 120))
            if not r.ok:
                warnings.warn(f"Failed to download {catalog_file}")
                return None

            # Load the data into an xarray dataset
            data = io.BytesIO(r.content)
            if use_dask:
                ds = xr.open_dataset(
                    data, decode_cf=False, chunks='auto', mask_and_scale=False)
            else:
                ds = xr.load_dataset(
                    data, decode_cf=False, mask_and_scale=False)

            # Process the dataset
            ds = ds.swap_dims({'obs': 'time'}).reset_coords()
            ds = ds.sortby('time')

            # Drop unnecessary variables, clean time units
            keys_to_drop = ['obs', 'id', 'provenance', 'driver_timestamp',
                            'ingestion_timestamp']
            ds = ds.drop_vars([key for key in keys_to_drop
                               if key in ds.variables])

            return ds
        except Exception as e:
            warnings.warn(f"Error processing {catalog_file}: {e}")
            return None

    def merge_frames(self, frames: List[xr.Dataset]) -> xr.Dataset:
        """
        Merge multiple datasets into a single xarray dataset.

        Args:
            frames: A list of xarray datasets to merge.

        Returns:
            The merged xarray dataset.
        """
        if len(frames) == 1:
            return frames[0]

        # Attempt to merge the datasets
        try:
            data = xr.concat(frames, dim='time')
        except ValueError:
            # If concatenation fails, attempt merging one by one
            data, failed = self._frame_merger(frames[0], frames)
            if failed > 0:
                warnings.warn(f"{failed} frames failed to merge.")

        # Sort by time and remove duplicates
        data = data.sortby('time')
        _, index = np.unique(data['time'], return_index=True)
        data = data.isel(time=index)

        return data

    @staticmethod
    def _frame_merger(
            data: xr.Dataset, frames: List[xr.Dataset]
    ) -> (xr.Dataset, int):
        """
        Helper function to merge datasets one-by-one if bulk concatenation
        fails.

        Args:
            data: The initial dataset to merge.
            frames: The remaining datasets to merge into the initial one.

        Returns:
            The merged dataset and a count of failed merges.
        """
        failed = 0
        for frame in frames[1:]:
            try:
                data = xr.concat([data, frame], dim='time')
            except (ValueError, NotImplementedError):
                try:
                    data = data.merge(frame, compat='override')
                except (ValueError, NotImplementedError):
                    failed += 1
        return data, failed

__init__()

Initializes the DataManager.

Source code in yooink/data/data_manager.py
13
14
15
def __init__(self) -> None:
    """Initializes the DataManager."""
    pass

merge_frames(frames)

Merge multiple datasets into a single xarray dataset.

Parameters:

Name Type Description Default
frames List[Dataset]

A list of xarray datasets to merge.

required

Returns:

Type Description
Dataset

The merged xarray dataset.

Source code in yooink/data/data_manager.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def merge_frames(self, frames: List[xr.Dataset]) -> xr.Dataset:
    """
    Merge multiple datasets into a single xarray dataset.

    Args:
        frames: A list of xarray datasets to merge.

    Returns:
        The merged xarray dataset.
    """
    if len(frames) == 1:
        return frames[0]

    # Attempt to merge the datasets
    try:
        data = xr.concat(frames, dim='time')
    except ValueError:
        # If concatenation fails, attempt merging one by one
        data, failed = self._frame_merger(frames[0], frames)
        if failed > 0:
            warnings.warn(f"{failed} frames failed to merge.")

    # Sort by time and remove duplicates
    data = data.sortby('time')
    _, index = np.unique(data['time'], return_index=True)
    data = data.isel(time=index)

    return data

process_file(catalog_file, use_dask=False) staticmethod

Download and process a NetCDF file into an xarray dataset.

Parameters:

Name Type Description Default
catalog_file str

URL or path to the NetCDF file.

required
use_dask bool

Whether to use dask for processing (for large files).

False

Returns:

Type Description
Dataset | None

The xarray dataset.

Source code in yooink/data/data_manager.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@staticmethod
def process_file(catalog_file: str, use_dask: bool = False
                 ) -> xr.Dataset | None:
    """
    Download and process a NetCDF file into an xarray dataset.

    Args:
        catalog_file: URL or path to the NetCDF file.
        use_dask: Whether to use dask for processing (for large files).

    Returns:
        The xarray dataset.
    """
    try:
        # Convert the catalog file URL to the data URL
        tds_url = ('https://opendap.oceanobservatories.org/thredds/'
                   'fileServer/')
        data_url = re.sub(
            r'catalog.html\?dataset=', tds_url, catalog_file)

        # Download the dataset
        r = requests.get(data_url, timeout=(3.05, 120))
        if not r.ok:
            warnings.warn(f"Failed to download {catalog_file}")
            return None

        # Load the data into an xarray dataset
        data = io.BytesIO(r.content)
        if use_dask:
            ds = xr.open_dataset(
                data, decode_cf=False, chunks='auto', mask_and_scale=False)
        else:
            ds = xr.load_dataset(
                data, decode_cf=False, mask_and_scale=False)

        # Process the dataset
        ds = ds.swap_dims({'obs': 'time'}).reset_coords()
        ds = ds.sortby('time')

        # Drop unnecessary variables, clean time units
        keys_to_drop = ['obs', 'id', 'provenance', 'driver_timestamp',
                        'ingestion_timestamp']
        ds = ds.drop_vars([key for key in keys_to_drop
                           if key in ds.variables])

        return ds
    except Exception as e:
        warnings.warn(f"Error processing {catalog_file}: {e}")
        return None