Skip to content

Data Fetcher Module

DataFetcher

Source code in yooink/request/data_fetcher.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
class DataFetcher:
    def __init__(self, username=None, token=None) -> None:
        """ Initialize the DatasetFetcher. """
        self.username = username or os.getenv('OOI_USER')
        self.token = token or os.getenv('OOI_TOKEN')
        self.api_client = APIClient(self.username, self.token)
        self.data_manager = DataManager()
        self.request_manager = RequestManager(
            self.api_client, use_file_cache=True)

    @staticmethod
    def filter_urls(
            site: str,
            assembly: str,
            instrument: str,
            method: str
    ) -> tuple[List[str], List[str], List[str]]:
        """
        Filters the M2M_URLS dictionary for the instrument of interest.

        This function searches for the instrument of interest as defined by the
        site code, assembly type, instrument class, and data delivery method to
        return the OOI specific site, node and stream names needed to
        request the data.

        Args:
            site: OOI eight letter site code (e.g. CE04OSPS for the Oregon
                Offshore Shallow Profiler)
            assembly: Assembly grouping name (e.g. midwater for the 200 m
                Platform)
            instrument: The instrument class name (e.g. phsen for the
                SAMI2-pH sensor)
            method: The data delivery method (e.g. streamed for cabled
                streaming data)

        Returns:
            A tuple containing three lists:
                - node: The OOI specific node code(s) for the assembly
                - sensor: The OOI specific sensor code(s) for the instrument
                    class
                - stream: The OOI specific stream name(s) for the site, node,
                    sensor and delivery method combination

        Raises:
            SyntaxError: If an unknown site code or data delivery method is
                provided.
            RuntimeWarning: If the instrument defined by the given parameters
                cannot be found.
        """
        node: List[str] = []
        sensor: List[str] = []
        stream: List[str] = []

        # Pare the larger dictionary down to the site of interest and check if
        # a valid site was used
        m2m_urls: Dict[str, Any] = M2M_URLS.get(site.upper())
        if not m2m_urls:
            raise SyntaxError(f'Unknown site code: {site}')

        # Make sure the correct data delivery method was specified
        valid_methods = ['streamed', 'telemetered', 'recovered_host',
                         'recovered_inst', 'recovered_cspp', 'recovered_wfp']
        if method not in valid_methods:
            raise SyntaxError(f'Unknown data delivery method: {method}')

        # Find the instrument(s) of interest in the assembly group
        for grouping in m2m_urls.get('assembly', []):
            if grouping.get('type') == assembly or grouping.get(
                    'subassembly') == assembly:
                for instrmt in grouping['instrument']:
                    if instrmt['class'] == instrument:
                        node.append(instrmt.node)
                        sensor.append(instrmt.sensor)
                        stream.append(instrmt.stream.get(method))

        # Check to see if we were able to find the system of interest
        if not stream:
            raise RuntimeWarning(
                f'Instrument defined by {site}-{assembly}-{instrument}-'
                f'{method} cannot be found.')

        # Return the OOI specific names for the node(s), sensor(s) and
        # stream(s)
        return node, sensor, stream

    def get_dataset(
            self,
            site: str,
            assembly: str,
            instrument: str,
            method: str,
            **kwargs: Any
    ) -> xr.Dataset:
        """
        Requests data via the OOI M2M API using the site code, assembly type,
        instrument class and data delivery method.

        This function constructs the OOI specific data request using the
        parameters defined in the m2m_urls.yml file.

        Args:
            site: OOI site code as an 8 character string
            assembly: The assembly type where the instrument is located
            instrument: The OOI instrument class name for the instrument of
                interest
            method: The data delivery method for the system of interest
            **kwargs: Optional keyword arguments:
                start: Starting date/time for the data request in a
                    dateutil.parser recognizable form. If None, the beginning
                    of the data record will be used.
                stop: Ending date/time for the data request in a
                    dateutil.parser recognizable form. If None, the end of
                    the data record will be used.
                deploy: Use the deployment number (integer) to set the starting
                    and ending dates. If None, the starting and ending dates
                    are used. If both are provided, the deployment number
                    takes priority.
                aggregate: In cases where more than one instance of an
                    instrument class is part of an assembly, will collect
                    all the data if 0, or the specific instance of the
                    instrument if any value greater than 0 is used. If None,
                    the first instance of an instrument will be used.

        Returns:
            An xarray dataset containing the requested data for further
            analysis.

        Raises:
            KeyError: If an unknown keyword argument is provided.
            SyntaxError: If the date string format is unrecognizable or if an
                invalid aggregate value is provided.
            RuntimeWarning: If deployment dates are unavailable or if data is
                unavailable for the specified parameters.
        """
        # Setup inputs to the function, make sure case is correct
        site = site.upper()
        assembly = assembly.lower()
        instrument = instrument.lower()
        method = method.lower()

        # Parse the keyword arguments
        start: Optional[str] = None
        stop: Optional[str] = None
        deploy: Optional[int] = None
        aggregate: Optional[int] = None
        for key, value in kwargs.items():
            if key not in ['start', 'stop', 'deploy', 'aggregate']:
                raise KeyError(f'Unknown keyword ({key}) argument.')
            else:
                if key == 'start':
                    start = value
                if key == 'stop':
                    stop = value
                if key == 'deploy':
                    deploy = value
                if key == 'aggregate':
                    aggregate = value

        # Use the assembly, instrument and data delivery methods to find the
        # system of interest
        node, sensor, stream = self.filter_urls(
            site, assembly, instrument, method)

        # Check the formatting of the start and end dates. We need to be able
        # to parse and convert to an ISO format.
        if start:
            try:
                start = parser.parse(start)
                start = start.astimezone(pytz.utc)
                start = start.strftime('%Y-%m-%dT%H:%M:%S.000Z')
            except parser.ParserError:
                raise SyntaxError(
                    'Formatting of the starting date string needs to be in a '
                    'recognizable format')

        if stop:
            try:
                stop = parser.parse(stop)
                stop = stop.astimezone(pytz.utc)
                stop = stop.strftime('%Y-%m-%dT%H:%M:%S.000Z')
            except parser.ParserError:
                raise SyntaxError(
                    'Formatting of the ending date string needs to be in a '
                    'recognizable format')

        if deploy:
            # Determine start and end dates based on the deployment number
            start, stop = self.request_manager.get_deployment_dates(
                site, node[0], sensor[0], deploy)
            if not start or not stop:
                exit_text = (
                    f'Deployment dates are unavailable for {site.lower()}-'
                    f'{assembly}-{instrument}-{method}, '
                    f'deployment {deploy:02d}.')
                raise RuntimeWarning(exit_text)

        # For some cases, there may be more than 1 stream, but in general,
        # we only want the first one
        stream = stream[0][0] if isinstance(stream[0], list) else stream[0]

        tag = f'.*{instrument.upper()}.*\\.nc$'  # set regex tag
        data: Optional[xr.Dataset] = None  # setup the default data set

        # Check if there are multiple instances of this instrument class on the
        # assembly
        if len(node) > 1:
            print(
                f'There are multiple instances of the instrument {instrument} '
                f'under {site.lower()}-{assembly}.')

        # Check if we are aggregating the multiple instruments into a single
        # data set
        if isinstance(aggregate, int):
            if aggregate == 0:
                print(
                    f'Requesting all {len(node)} instances of this '
                    f'instrument. Data sets will be concatenated\n'
                    'and a new variable called `sensor_count` will be added '
                    'to help distinguish the \n'
                    'instruments for later processing.')
                for i in range(len(node)):
                    temp = self.request_manager.fetch_data(
                        site, node[i], sensor[i], method, stream, start, stop,
                        tag=tag
                    )
                    temp['sensor_count'] = temp['deployment'] * 0 + i + 1
                    if not data:
                        data = temp
                    else:
                        data = xr.concat([data, temp], dim='time')
            else:
                if aggregate > len(node):
                    raise SyntaxError(
                        f'Only {len(node)} instruments available, you '
                        f'selected {aggregate}')

                print(f'Requesting instrument {aggregate} out of {len(node)}.')
                i = aggregate - 1
                data = self.request_manager.fetch_data(
                    site, node[i], sensor[i], method, stream, start, stop,
                    tag=tag
                )

        else:
            data = self.request_manager.fetch_data(
                site, node[0], sensor[0], method, stream, start, stop,
                tag=tag
            )

        if not data:
            raise RuntimeWarning(
                f'Data unavailable for {site.lower()}-{assembly}-'
                f'{instrument}-{method}.')

        # Convert strings with data types set as objects or S64 with binary
        # encoding
        for v in data.variables:
            if data[v].dtype == np.dtype('O') or data[v].dtype == np.dtype(
                    'S64'):
                data[v] = data[v].astype(np.str_)

        return data

__init__(username=None, token=None)

Initialize the DatasetFetcher.

Source code in yooink/request/data_fetcher.py
15
16
17
18
19
20
21
22
def __init__(self, username=None, token=None) -> None:
    """ Initialize the DatasetFetcher. """
    self.username = username or os.getenv('OOI_USER')
    self.token = token or os.getenv('OOI_TOKEN')
    self.api_client = APIClient(self.username, self.token)
    self.data_manager = DataManager()
    self.request_manager = RequestManager(
        self.api_client, use_file_cache=True)

filter_urls(site, assembly, instrument, method) staticmethod

Filters the M2M_URLS dictionary for the instrument of interest.

This function searches for the instrument of interest as defined by the site code, assembly type, instrument class, and data delivery method to return the OOI specific site, node and stream names needed to request the data.

Parameters:

Name Type Description Default
site str

OOI eight letter site code (e.g. CE04OSPS for the Oregon Offshore Shallow Profiler)

required
assembly str

Assembly grouping name (e.g. midwater for the 200 m Platform)

required
instrument str

The instrument class name (e.g. phsen for the SAMI2-pH sensor)

required
method str

The data delivery method (e.g. streamed for cabled streaming data)

required

Returns:

Type Description
tuple[List[str], List[str], List[str]]

A tuple containing three lists: - node: The OOI specific node code(s) for the assembly - sensor: The OOI specific sensor code(s) for the instrument class - stream: The OOI specific stream name(s) for the site, node, sensor and delivery method combination

Raises:

Type Description
SyntaxError

If an unknown site code or data delivery method is provided.

RuntimeWarning

If the instrument defined by the given parameters cannot be found.

Source code in yooink/request/data_fetcher.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@staticmethod
def filter_urls(
        site: str,
        assembly: str,
        instrument: str,
        method: str
) -> tuple[List[str], List[str], List[str]]:
    """
    Filters the M2M_URLS dictionary for the instrument of interest.

    This function searches for the instrument of interest as defined by the
    site code, assembly type, instrument class, and data delivery method to
    return the OOI specific site, node and stream names needed to
    request the data.

    Args:
        site: OOI eight letter site code (e.g. CE04OSPS for the Oregon
            Offshore Shallow Profiler)
        assembly: Assembly grouping name (e.g. midwater for the 200 m
            Platform)
        instrument: The instrument class name (e.g. phsen for the
            SAMI2-pH sensor)
        method: The data delivery method (e.g. streamed for cabled
            streaming data)

    Returns:
        A tuple containing three lists:
            - node: The OOI specific node code(s) for the assembly
            - sensor: The OOI specific sensor code(s) for the instrument
                class
            - stream: The OOI specific stream name(s) for the site, node,
                sensor and delivery method combination

    Raises:
        SyntaxError: If an unknown site code or data delivery method is
            provided.
        RuntimeWarning: If the instrument defined by the given parameters
            cannot be found.
    """
    node: List[str] = []
    sensor: List[str] = []
    stream: List[str] = []

    # Pare the larger dictionary down to the site of interest and check if
    # a valid site was used
    m2m_urls: Dict[str, Any] = M2M_URLS.get(site.upper())
    if not m2m_urls:
        raise SyntaxError(f'Unknown site code: {site}')

    # Make sure the correct data delivery method was specified
    valid_methods = ['streamed', 'telemetered', 'recovered_host',
                     'recovered_inst', 'recovered_cspp', 'recovered_wfp']
    if method not in valid_methods:
        raise SyntaxError(f'Unknown data delivery method: {method}')

    # Find the instrument(s) of interest in the assembly group
    for grouping in m2m_urls.get('assembly', []):
        if grouping.get('type') == assembly or grouping.get(
                'subassembly') == assembly:
            for instrmt in grouping['instrument']:
                if instrmt['class'] == instrument:
                    node.append(instrmt.node)
                    sensor.append(instrmt.sensor)
                    stream.append(instrmt.stream.get(method))

    # Check to see if we were able to find the system of interest
    if not stream:
        raise RuntimeWarning(
            f'Instrument defined by {site}-{assembly}-{instrument}-'
            f'{method} cannot be found.')

    # Return the OOI specific names for the node(s), sensor(s) and
    # stream(s)
    return node, sensor, stream

get_dataset(site, assembly, instrument, method, **kwargs)

Requests data via the OOI M2M API using the site code, assembly type, instrument class and data delivery method.

This function constructs the OOI specific data request using the parameters defined in the m2m_urls.yml file.

Parameters:

Name Type Description Default
site str

OOI site code as an 8 character string

required
assembly str

The assembly type where the instrument is located

required
instrument str

The OOI instrument class name for the instrument of interest

required
method str

The data delivery method for the system of interest

required
**kwargs Any

Optional keyword arguments: start: Starting date/time for the data request in a dateutil.parser recognizable form. If None, the beginning of the data record will be used. stop: Ending date/time for the data request in a dateutil.parser recognizable form. If None, the end of the data record will be used. deploy: Use the deployment number (integer) to set the starting and ending dates. If None, the starting and ending dates are used. If both are provided, the deployment number takes priority. aggregate: In cases where more than one instance of an instrument class is part of an assembly, will collect all the data if 0, or the specific instance of the instrument if any value greater than 0 is used. If None, the first instance of an instrument will be used.

{}

Returns:

Type Description
Dataset

An xarray dataset containing the requested data for further

Dataset

analysis.

Raises:

Type Description
KeyError

If an unknown keyword argument is provided.

SyntaxError

If the date string format is unrecognizable or if an invalid aggregate value is provided.

RuntimeWarning

If deployment dates are unavailable or if data is unavailable for the specified parameters.

Source code in yooink/request/data_fetcher.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def get_dataset(
        self,
        site: str,
        assembly: str,
        instrument: str,
        method: str,
        **kwargs: Any
) -> xr.Dataset:
    """
    Requests data via the OOI M2M API using the site code, assembly type,
    instrument class and data delivery method.

    This function constructs the OOI specific data request using the
    parameters defined in the m2m_urls.yml file.

    Args:
        site: OOI site code as an 8 character string
        assembly: The assembly type where the instrument is located
        instrument: The OOI instrument class name for the instrument of
            interest
        method: The data delivery method for the system of interest
        **kwargs: Optional keyword arguments:
            start: Starting date/time for the data request in a
                dateutil.parser recognizable form. If None, the beginning
                of the data record will be used.
            stop: Ending date/time for the data request in a
                dateutil.parser recognizable form. If None, the end of
                the data record will be used.
            deploy: Use the deployment number (integer) to set the starting
                and ending dates. If None, the starting and ending dates
                are used. If both are provided, the deployment number
                takes priority.
            aggregate: In cases where more than one instance of an
                instrument class is part of an assembly, will collect
                all the data if 0, or the specific instance of the
                instrument if any value greater than 0 is used. If None,
                the first instance of an instrument will be used.

    Returns:
        An xarray dataset containing the requested data for further
        analysis.

    Raises:
        KeyError: If an unknown keyword argument is provided.
        SyntaxError: If the date string format is unrecognizable or if an
            invalid aggregate value is provided.
        RuntimeWarning: If deployment dates are unavailable or if data is
            unavailable for the specified parameters.
    """
    # Setup inputs to the function, make sure case is correct
    site = site.upper()
    assembly = assembly.lower()
    instrument = instrument.lower()
    method = method.lower()

    # Parse the keyword arguments
    start: Optional[str] = None
    stop: Optional[str] = None
    deploy: Optional[int] = None
    aggregate: Optional[int] = None
    for key, value in kwargs.items():
        if key not in ['start', 'stop', 'deploy', 'aggregate']:
            raise KeyError(f'Unknown keyword ({key}) argument.')
        else:
            if key == 'start':
                start = value
            if key == 'stop':
                stop = value
            if key == 'deploy':
                deploy = value
            if key == 'aggregate':
                aggregate = value

    # Use the assembly, instrument and data delivery methods to find the
    # system of interest
    node, sensor, stream = self.filter_urls(
        site, assembly, instrument, method)

    # Check the formatting of the start and end dates. We need to be able
    # to parse and convert to an ISO format.
    if start:
        try:
            start = parser.parse(start)
            start = start.astimezone(pytz.utc)
            start = start.strftime('%Y-%m-%dT%H:%M:%S.000Z')
        except parser.ParserError:
            raise SyntaxError(
                'Formatting of the starting date string needs to be in a '
                'recognizable format')

    if stop:
        try:
            stop = parser.parse(stop)
            stop = stop.astimezone(pytz.utc)
            stop = stop.strftime('%Y-%m-%dT%H:%M:%S.000Z')
        except parser.ParserError:
            raise SyntaxError(
                'Formatting of the ending date string needs to be in a '
                'recognizable format')

    if deploy:
        # Determine start and end dates based on the deployment number
        start, stop = self.request_manager.get_deployment_dates(
            site, node[0], sensor[0], deploy)
        if not start or not stop:
            exit_text = (
                f'Deployment dates are unavailable for {site.lower()}-'
                f'{assembly}-{instrument}-{method}, '
                f'deployment {deploy:02d}.')
            raise RuntimeWarning(exit_text)

    # For some cases, there may be more than 1 stream, but in general,
    # we only want the first one
    stream = stream[0][0] if isinstance(stream[0], list) else stream[0]

    tag = f'.*{instrument.upper()}.*\\.nc$'  # set regex tag
    data: Optional[xr.Dataset] = None  # setup the default data set

    # Check if there are multiple instances of this instrument class on the
    # assembly
    if len(node) > 1:
        print(
            f'There are multiple instances of the instrument {instrument} '
            f'under {site.lower()}-{assembly}.')

    # Check if we are aggregating the multiple instruments into a single
    # data set
    if isinstance(aggregate, int):
        if aggregate == 0:
            print(
                f'Requesting all {len(node)} instances of this '
                f'instrument. Data sets will be concatenated\n'
                'and a new variable called `sensor_count` will be added '
                'to help distinguish the \n'
                'instruments for later processing.')
            for i in range(len(node)):
                temp = self.request_manager.fetch_data(
                    site, node[i], sensor[i], method, stream, start, stop,
                    tag=tag
                )
                temp['sensor_count'] = temp['deployment'] * 0 + i + 1
                if not data:
                    data = temp
                else:
                    data = xr.concat([data, temp], dim='time')
        else:
            if aggregate > len(node):
                raise SyntaxError(
                    f'Only {len(node)} instruments available, you '
                    f'selected {aggregate}')

            print(f'Requesting instrument {aggregate} out of {len(node)}.')
            i = aggregate - 1
            data = self.request_manager.fetch_data(
                site, node[i], sensor[i], method, stream, start, stop,
                tag=tag
            )

    else:
        data = self.request_manager.fetch_data(
            site, node[0], sensor[0], method, stream, start, stop,
            tag=tag
        )

    if not data:
        raise RuntimeWarning(
            f'Data unavailable for {site.lower()}-{assembly}-'
            f'{instrument}-{method}.')

    # Convert strings with data types set as objects or S64 with binary
    # encoding
    for v in data.variables:
        if data[v].dtype == np.dtype('O') or data[v].dtype == np.dtype(
                'S64'):
            data[v] = data[v].astype(np.str_)

    return data