Skip to content

Request Manager Module

RequestManager

Source code in yooink/request/request_manager.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
class RequestManager:
    CACHE_FILE = "url_cache.json"

    def __init__(
            self,
            api_client: APIClient,
            use_file_cache: bool = True,
            cache_expiry: int = 14
    ) -> None:
        """
        Initializes the RequestManager with an instance of APIClient and cache
        options.

        Args:
            api_client: An instance of the APIClient class.
            use_file_cache: Whether to enable file-based caching (default
                False).
            cache_expiry: The number of days before cache entries expire
                (default 14 days).
        """
        self.api_client = api_client
        self.data_manager = DataManager()
        self.cached_urls = {}
        self.use_file_cache = use_file_cache
        self.cache_expiry = cache_expiry

        # Load cache from file if enabled
        if self.use_file_cache:
            self.load_cache_from_file()

    def load_cache_from_file(self) -> None:
        """
        Loads the cached URLs from a JSON file and removes expired entries.
        If the file is empty or contains invalid JSON, it initializes an empty
        cache.
        """
        if not os.path.exists(self.CACHE_FILE):
            return

        try:
            with open(self.CACHE_FILE, 'r') as file:
                content = file.read().strip()

                if not content:  # Check if file is empty
                    print("Cache file is empty. Initializing new cache.")
                    file_cache = {}
                else:
                    file_cache = json.loads(content)

            # Filter out expired cache entries
            current_time = time.time()
            valid_cache = {
                key: value for key, value in file_cache.items() if
                current_time - value['timestamp'] < self.cache_expiry * 86400
            }

            self.cached_urls = valid_cache
            self.save_cache_to_file()  # Save the updated cache

        except json.JSONDecodeError:
            print("Cache file contains invalid JSON. Initializing new cache.")
            self.cached_urls = {}
            self.save_cache_to_file()

    def save_cache_to_file(self) -> None:
        """
        Saves the current cached URLs to a JSON file, appending new URLs to the
        existing cache.
        """
        # Load existing cache if it exists
        file_cache = {}
        if os.path.exists(self.CACHE_FILE):
            try:
                with open(self.CACHE_FILE, 'r') as file:
                    content = file.read().strip()
                    if content:
                        file_cache = json.loads(content)
            except json.JSONDecodeError:
                print(
                    "Existing cache file contains invalid JSON. "
                    "Overwriting with new cache.")

        # Merge the in-memory cache with the file cache
        file_cache.update(self.cached_urls)

        # Write the merged cache to a temporary file, then replace the original
        # file
        temp_file = None
        try:
            temp_dir = os.path.dirname(self.CACHE_FILE)
            with tempfile.NamedTemporaryFile('w', dir=temp_dir,
                                             delete=False) as temp_file:
                json.dump(file_cache, temp_file)

            # Replace the original cache file with the temp file
            os.replace(temp_file.name, self.CACHE_FILE)

        except Exception as e:
            print(f"Error saving cache: {e}")

            # Ensure temp file is deleted if something goes wrong
            if temp_file:
                os.remove(temp_file.name)

    def list_sites(self) -> List[Dict[str, Any]]:
        """
        Lists all available sites from the API.

        Returns:
            A list of sites as dictionaries.
        """
        endpoint = ""
        return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

    def list_nodes(self, site: str) -> List[Dict[str, Any]]:
        """
        Lists nodes for a specific site.

        Args:
            site: The site identifier.

        Returns:
            List: A list of nodes as dictionaries.
        """
        endpoint = f"{site}/"
        return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

    def list_sensors(self, site: str, node: str) -> List[Dict[str, Any]]:
        """
        Lists sensors for a specific site and node.

        Args:
            site: The site identifier.
            node: The node identifier.

        Returns:
            List: A list of sensors as dictionaries.
        """
        endpoint = f"{site}/{node}/"
        return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

    def list_methods(
            self, site: str, node: str, sensor: str) -> List[Dict[str, Any]]:
        """
        Lists methods available for a specific data.

        Args:
            site: The site identifier.
            node: The node identifier.
            sensor: The data identifier.

        Returns:
            A list of methods as dictionaries.
        """
        endpoint = f"{site}/{node}/{sensor}/"
        return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

    def get_metadata(
            self, site: str, node: str, sensor: str) -> Dict[str, Any]:
        """
        Retrieves metadata for a specific data.

        Args:
            site: The site identifier.
            node: The node identifier.
            sensor: The data identifier.

        Returns:
            The metadata as a dictionary.
        """
        endpoint = f"{site}/{node}/{sensor}/metadata"
        return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

    def list_streams(
            self, site: str, node: str, sensor: str, method: str) \
            -> List[Dict[str, Any]]:
        """
        Lists available streams for a specific data and method.

        Args:
            site: The site identifier.
            node: The node identifier.
            sensor: The data identifier.
            method: The method (e.g., telemetered).

        Returns:
            A list of streams as dictionaries.
        """
        endpoint = f"{site}/{node}/{sensor}/{method}/"
        return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

    def list_deployments(
            self, site: str, node: str, sensor: str
    ) -> List[Dict[str, Any]]:
        """
        Lists deployments for a specific site, node, and sensor.

        Args:
            site: The site identifier.
            node: The node identifier.
            sensor: The sensor identifier.

        Returns:
            A list of deployments as dictionaries.
        """
        endpoint = f"{site}/{node}/{sensor}"
        return self.api_client.make_request(M2MInterface.DEPLOY_URL, endpoint)

    def get_sensor_information(
            self, site: str, node: str, sensor: str, deploy: Union[int, str]
    ) -> list:
        """
        Retrieves sensor metadata for a specific deployment.

        Args:
            site: The site identifier.
            node: The node identifier.
            sensor: The sensor identifier.
            deploy: The deployment number.

        Returns:
            The sensor information as a dictionary.
        """
        endpoint = f"{site}/{node}/{sensor}/{str(deploy)}"
        return self.api_client.make_request(M2MInterface.DEPLOY_URL, endpoint)

    def get_deployment_dates(
            self,
            site: str,
            node: str,
            sensor: str,
            deploy: str | int
    ) -> Optional[Dict[str, str]]:
        """
        Retrieves the start and stop dates for a specific deployment.

        Args:
            site: The site identifier.
            node: The node identifier.
            sensor: The sensor identifier.
            deploy: The deployment number.

        Returns:
            A dictionary with the start and stop dates, or None if the
                information is not available.
        """
        sensor_info = self.get_sensor_information(site, node, sensor,
                                                  str(deploy))

        if sensor_info:
            start = time.strftime(
                '%Y-%m-%dT%H:%M:%S.000Z',
                time.gmtime(sensor_info[0]['eventStartTime'] / 1000.0))

            if sensor_info[0].get('eventStopTime'):
                stop = time.strftime(
                    '%Y-%m-%dT%H:%M:%S.000Z',
                    time.gmtime(sensor_info[0]['eventStopTime'] / 1000.0))
            else:
                stop = time.strftime(
                    '%Y-%m-%dT%H:%M:%S.000Z',
                    time.gmtime(time.time()))

            return {'start': start, 'stop': stop}
        else:
            return None

    def get_sensor_history(self, uid: str) -> Dict[str, Any]:
        """
        Retrieves the asset and calibration information for a sensor across all
        deployments.

        Args:
            uid: The unique asset identifier (UID).

        Returns:
            The sensor history as a dictionary.
        """
        endpoint = f"asset/deployments/{uid}?editphase=ALL"
        return self.api_client.make_request(M2MInterface.DEPLOY_URL, endpoint)

    def fetch_data(
            self, site: str, node: str, sensor: str, method: str,
            stream: str, begin_datetime: str, end_datetime: str,
            use_dask=False, tag: str = r'.*\.nc$') -> Dataset | None:
        """
        Fetch the URLs for netCDF files from the THREDDS server based on site,
        node, data, and method.
        """
        # Construct a cache key using relevant details
        cache_key = (f"{site}_{node}_{sensor}_{method}_{stream}_"
                     f"{begin_datetime}_{end_datetime}")

        # Check if the request is already cached
        if cache_key in self.cached_urls:
            print(f"Using cached URL for request: {cache_key}")
            async_url = self.cached_urls[cache_key]['async_url']
            tds_url = self.cached_urls[cache_key]['tds_url']
            # You can now re-check the status of this request
            check_complete = async_url + '/status.txt'
            response = self.api_client.session.get(check_complete)
            if response.status_code == requests.codes.ok:
                datasets = self.get_filtered_files(
                    {'allURLs': [tds_url]}, tag)
            else:
                print(f"Data not ready yet for cached request: {cache_key}")
                return None
        else:
            # Proceed with normal request flow if not cached
            print(
                f"Requesting data for site: {site}, node: {node}, "
                f"sensor: {sensor}, method: {method}, stream: {stream}")
            data = self.wait_for_m2m_data(site, node, sensor, method, stream,
                                          begin_datetime, end_datetime)
            if not data:
                print("Request failed or timed out. Please try again later.")
                return None

            # Extract URLs from the M2M response
            datasets = self.get_filtered_files(data)

        # Continue with processing and merging the datasets as before
        if len(datasets) > 5:
            part_files = partial(self.data_manager.process_file,
                                 use_dask=use_dask)
            with ProcessPoolExecutor(max_workers=4) as executor:
                frames = list(tqdm(executor.map(part_files, datasets),
                                   total=len(datasets),
                                   desc='Processing files'))
        else:
            frames = [self.data_manager.process_file(f, use_dask=use_dask)
                      for f in
                      tqdm(datasets, desc='Processing files')]

        return self.data_manager.merge_frames(frames)

    def wait_for_m2m_data(
            self, site: str, node: str, sensor: str, method: str,
            stream: str, begin_datetime: str, end_datetime: str) -> Any | None:
        """
        Request data from the M2M API and wait for completion, displaying
        progress with tqdm.
        """
        # Step 1: Set up request details
        params = {
            'beginDT': begin_datetime, 'endDT': end_datetime,
            'format': 'application/netcdf', 'include_provenance': 'true',
            'include_annotations': 'true'}
        details = f"{site}/{node}/{sensor}/{method}/{stream}"

        # Step 2: Make the request and get the response
        response = self.api_client.make_request(M2MInterface.SENSOR_URL,
                                                details, params)

        if 'allURLs' not in response:
            print("No URLs found in the response.")
            return None

        # Step 3: Extract the async URL and status URL
        url = [url for url in response['allURLs'] if
               re.match(r'.*async_results.*', url)][0]
        thredds_url = response['allURLs'][0]
        check_complete = url + '/status.txt'

        # Step 4: Cache the URL immediately after the request is submitted
        cache_key = (f"{site}_{node}_{sensor}_{method}_{stream}_"
                     f"{begin_datetime}_{end_datetime}")
        self.cached_urls[cache_key] = {
            'tds_url': thredds_url,
            'async_url': url,
            'timestamp': time.time()
        }

        if self.use_file_cache:
            self.save_cache_to_file()  # Save cache immediately

        # Step 5: Use tqdm to wait for completion
        print(
            "Waiting for OOINet to process and prepare the data. This may "
            "take up to 20 minutes.")
        with tqdm(total=400, desc='Waiting', file=sys.stdout) as bar:
            for i in range(400):
                try:
                    r = self.api_client.session.get(check_complete,
                                                    timeout=(3.05, 120))
                    if r.status_code == 200:  # Data is ready
                        bar.n = 400  # Complete the progress bar
                        return response
                    elif r.status_code == 404:
                        pass
                except requests.exceptions.RequestException as e:
                    print(f"Error during status check: {e}")

                bar.update()
                bar.refresh()
                time.sleep(3)  # Wait 3 seconds between checks

        # If we exit the loop without the request being ready, return None
        print("Data request timed out. Please try again later.")
        return None

    def get_filtered_files(
            self,
            data: dict,
            tag: str = r'.*\.nc$'
    ) -> List[str]:
        """
        Extract the relevant file URLs from the M2M response, filtered using a
        regex tag.

        Args:
            data: JSON response from the M2M API request.
            tag: A regex tag to filter the .nc files (default is to match any
                .nc file).

        Returns:
            A list of filtered .nc file URLs.
        """
        # Fetch the datasets page from the THREDDS server
        datasets_page = self.api_client.fetch_thredds_page(data['allURLs'][0])

        # Use the list_files function with regex to filter the files
        return self.list_files(datasets_page, tag=tag)

    @staticmethod
    def list_files(
            page_content: str,
            tag: str = r'.*\.nc$'
    ) -> List[str]:
        """
        Create a list of the NetCDF data files in the THREDDS catalog using
        regex.

        Args:
            page_content: HTML content of the THREDDS catalog page.
            tag: A regex pattern to filter files.

        Returns:
            A list of files that match the regex tag.
        """
        pattern = re.compile(tag)
        soup = BeautifulSoup(page_content, 'html.parser')
        return [node.get('href') for node in
                soup.find_all('a', string=pattern)]

__init__(api_client, use_file_cache=True, cache_expiry=14)

Initializes the RequestManager with an instance of APIClient and cache options.

Parameters:

Name Type Description Default
api_client APIClient

An instance of the APIClient class.

required
use_file_cache bool

Whether to enable file-based caching (default False).

True
cache_expiry int

The number of days before cache entries expire (default 14 days).

14
Source code in yooink/request/request_manager.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
        self,
        api_client: APIClient,
        use_file_cache: bool = True,
        cache_expiry: int = 14
) -> None:
    """
    Initializes the RequestManager with an instance of APIClient and cache
    options.

    Args:
        api_client: An instance of the APIClient class.
        use_file_cache: Whether to enable file-based caching (default
            False).
        cache_expiry: The number of days before cache entries expire
            (default 14 days).
    """
    self.api_client = api_client
    self.data_manager = DataManager()
    self.cached_urls = {}
    self.use_file_cache = use_file_cache
    self.cache_expiry = cache_expiry

    # Load cache from file if enabled
    if self.use_file_cache:
        self.load_cache_from_file()

fetch_data(site, node, sensor, method, stream, begin_datetime, end_datetime, use_dask=False, tag='.*\\.nc$')

Fetch the URLs for netCDF files from the THREDDS server based on site, node, data, and method.

Source code in yooink/request/request_manager.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def fetch_data(
        self, site: str, node: str, sensor: str, method: str,
        stream: str, begin_datetime: str, end_datetime: str,
        use_dask=False, tag: str = r'.*\.nc$') -> Dataset | None:
    """
    Fetch the URLs for netCDF files from the THREDDS server based on site,
    node, data, and method.
    """
    # Construct a cache key using relevant details
    cache_key = (f"{site}_{node}_{sensor}_{method}_{stream}_"
                 f"{begin_datetime}_{end_datetime}")

    # Check if the request is already cached
    if cache_key in self.cached_urls:
        print(f"Using cached URL for request: {cache_key}")
        async_url = self.cached_urls[cache_key]['async_url']
        tds_url = self.cached_urls[cache_key]['tds_url']
        # You can now re-check the status of this request
        check_complete = async_url + '/status.txt'
        response = self.api_client.session.get(check_complete)
        if response.status_code == requests.codes.ok:
            datasets = self.get_filtered_files(
                {'allURLs': [tds_url]}, tag)
        else:
            print(f"Data not ready yet for cached request: {cache_key}")
            return None
    else:
        # Proceed with normal request flow if not cached
        print(
            f"Requesting data for site: {site}, node: {node}, "
            f"sensor: {sensor}, method: {method}, stream: {stream}")
        data = self.wait_for_m2m_data(site, node, sensor, method, stream,
                                      begin_datetime, end_datetime)
        if not data:
            print("Request failed or timed out. Please try again later.")
            return None

        # Extract URLs from the M2M response
        datasets = self.get_filtered_files(data)

    # Continue with processing and merging the datasets as before
    if len(datasets) > 5:
        part_files = partial(self.data_manager.process_file,
                             use_dask=use_dask)
        with ProcessPoolExecutor(max_workers=4) as executor:
            frames = list(tqdm(executor.map(part_files, datasets),
                               total=len(datasets),
                               desc='Processing files'))
    else:
        frames = [self.data_manager.process_file(f, use_dask=use_dask)
                  for f in
                  tqdm(datasets, desc='Processing files')]

    return self.data_manager.merge_frames(frames)

get_deployment_dates(site, node, sensor, deploy)

Retrieves the start and stop dates for a specific deployment.

Parameters:

Name Type Description Default
site str

The site identifier.

required
node str

The node identifier.

required
sensor str

The sensor identifier.

required
deploy str | int

The deployment number.

required

Returns:

Type Description
Optional[Dict[str, str]]

A dictionary with the start and stop dates, or None if the information is not available.

Source code in yooink/request/request_manager.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def get_deployment_dates(
        self,
        site: str,
        node: str,
        sensor: str,
        deploy: str | int
) -> Optional[Dict[str, str]]:
    """
    Retrieves the start and stop dates for a specific deployment.

    Args:
        site: The site identifier.
        node: The node identifier.
        sensor: The sensor identifier.
        deploy: The deployment number.

    Returns:
        A dictionary with the start and stop dates, or None if the
            information is not available.
    """
    sensor_info = self.get_sensor_information(site, node, sensor,
                                              str(deploy))

    if sensor_info:
        start = time.strftime(
            '%Y-%m-%dT%H:%M:%S.000Z',
            time.gmtime(sensor_info[0]['eventStartTime'] / 1000.0))

        if sensor_info[0].get('eventStopTime'):
            stop = time.strftime(
                '%Y-%m-%dT%H:%M:%S.000Z',
                time.gmtime(sensor_info[0]['eventStopTime'] / 1000.0))
        else:
            stop = time.strftime(
                '%Y-%m-%dT%H:%M:%S.000Z',
                time.gmtime(time.time()))

        return {'start': start, 'stop': stop}
    else:
        return None

get_filtered_files(data, tag='.*\\.nc$')

Extract the relevant file URLs from the M2M response, filtered using a regex tag.

Parameters:

Name Type Description Default
data dict

JSON response from the M2M API request.

required
tag str

A regex tag to filter the .nc files (default is to match any .nc file).

'.*\\.nc$'

Returns:

Type Description
List[str]

A list of filtered .nc file URLs.

Source code in yooink/request/request_manager.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def get_filtered_files(
        self,
        data: dict,
        tag: str = r'.*\.nc$'
) -> List[str]:
    """
    Extract the relevant file URLs from the M2M response, filtered using a
    regex tag.

    Args:
        data: JSON response from the M2M API request.
        tag: A regex tag to filter the .nc files (default is to match any
            .nc file).

    Returns:
        A list of filtered .nc file URLs.
    """
    # Fetch the datasets page from the THREDDS server
    datasets_page = self.api_client.fetch_thredds_page(data['allURLs'][0])

    # Use the list_files function with regex to filter the files
    return self.list_files(datasets_page, tag=tag)

get_metadata(site, node, sensor)

Retrieves metadata for a specific data.

Parameters:

Name Type Description Default
site str

The site identifier.

required
node str

The node identifier.

required
sensor str

The data identifier.

required

Returns:

Type Description
Dict[str, Any]

The metadata as a dictionary.

Source code in yooink/request/request_manager.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def get_metadata(
        self, site: str, node: str, sensor: str) -> Dict[str, Any]:
    """
    Retrieves metadata for a specific data.

    Args:
        site: The site identifier.
        node: The node identifier.
        sensor: The data identifier.

    Returns:
        The metadata as a dictionary.
    """
    endpoint = f"{site}/{node}/{sensor}/metadata"
    return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

get_sensor_history(uid)

Retrieves the asset and calibration information for a sensor across all deployments.

Parameters:

Name Type Description Default
uid str

The unique asset identifier (UID).

required

Returns:

Type Description
Dict[str, Any]

The sensor history as a dictionary.

Source code in yooink/request/request_manager.py
291
292
293
294
295
296
297
298
299
300
301
302
303
def get_sensor_history(self, uid: str) -> Dict[str, Any]:
    """
    Retrieves the asset and calibration information for a sensor across all
    deployments.

    Args:
        uid: The unique asset identifier (UID).

    Returns:
        The sensor history as a dictionary.
    """
    endpoint = f"asset/deployments/{uid}?editphase=ALL"
    return self.api_client.make_request(M2MInterface.DEPLOY_URL, endpoint)

get_sensor_information(site, node, sensor, deploy)

Retrieves sensor metadata for a specific deployment.

Parameters:

Name Type Description Default
site str

The site identifier.

required
node str

The node identifier.

required
sensor str

The sensor identifier.

required
deploy Union[int, str]

The deployment number.

required

Returns:

Type Description
list

The sensor information as a dictionary.

Source code in yooink/request/request_manager.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def get_sensor_information(
        self, site: str, node: str, sensor: str, deploy: Union[int, str]
) -> list:
    """
    Retrieves sensor metadata for a specific deployment.

    Args:
        site: The site identifier.
        node: The node identifier.
        sensor: The sensor identifier.
        deploy: The deployment number.

    Returns:
        The sensor information as a dictionary.
    """
    endpoint = f"{site}/{node}/{sensor}/{str(deploy)}"
    return self.api_client.make_request(M2MInterface.DEPLOY_URL, endpoint)

list_deployments(site, node, sensor)

Lists deployments for a specific site, node, and sensor.

Parameters:

Name Type Description Default
site str

The site identifier.

required
node str

The node identifier.

required
sensor str

The sensor identifier.

required

Returns:

Type Description
List[Dict[str, Any]]

A list of deployments as dictionaries.

Source code in yooink/request/request_manager.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def list_deployments(
        self, site: str, node: str, sensor: str
) -> List[Dict[str, Any]]:
    """
    Lists deployments for a specific site, node, and sensor.

    Args:
        site: The site identifier.
        node: The node identifier.
        sensor: The sensor identifier.

    Returns:
        A list of deployments as dictionaries.
    """
    endpoint = f"{site}/{node}/{sensor}"
    return self.api_client.make_request(M2MInterface.DEPLOY_URL, endpoint)

list_files(page_content, tag='.*\\.nc$') staticmethod

Create a list of the NetCDF data files in the THREDDS catalog using regex.

Parameters:

Name Type Description Default
page_content str

HTML content of the THREDDS catalog page.

required
tag str

A regex pattern to filter files.

'.*\\.nc$'

Returns:

Type Description
List[str]

A list of files that match the regex tag.

Source code in yooink/request/request_manager.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
@staticmethod
def list_files(
        page_content: str,
        tag: str = r'.*\.nc$'
) -> List[str]:
    """
    Create a list of the NetCDF data files in the THREDDS catalog using
    regex.

    Args:
        page_content: HTML content of the THREDDS catalog page.
        tag: A regex pattern to filter files.

    Returns:
        A list of files that match the regex tag.
    """
    pattern = re.compile(tag)
    soup = BeautifulSoup(page_content, 'html.parser')
    return [node.get('href') for node in
            soup.find_all('a', string=pattern)]

list_methods(site, node, sensor)

Lists methods available for a specific data.

Parameters:

Name Type Description Default
site str

The site identifier.

required
node str

The node identifier.

required
sensor str

The data identifier.

required

Returns:

Type Description
List[Dict[str, Any]]

A list of methods as dictionaries.

Source code in yooink/request/request_manager.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def list_methods(
        self, site: str, node: str, sensor: str) -> List[Dict[str, Any]]:
    """
    Lists methods available for a specific data.

    Args:
        site: The site identifier.
        node: The node identifier.
        sensor: The data identifier.

    Returns:
        A list of methods as dictionaries.
    """
    endpoint = f"{site}/{node}/{sensor}/"
    return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

list_nodes(site)

Lists nodes for a specific site.

Parameters:

Name Type Description Default
site str

The site identifier.

required

Returns:

Name Type Description
List List[Dict[str, Any]]

A list of nodes as dictionaries.

Source code in yooink/request/request_manager.py
138
139
140
141
142
143
144
145
146
147
148
149
def list_nodes(self, site: str) -> List[Dict[str, Any]]:
    """
    Lists nodes for a specific site.

    Args:
        site: The site identifier.

    Returns:
        List: A list of nodes as dictionaries.
    """
    endpoint = f"{site}/"
    return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

list_sensors(site, node)

Lists sensors for a specific site and node.

Parameters:

Name Type Description Default
site str

The site identifier.

required
node str

The node identifier.

required

Returns:

Name Type Description
List List[Dict[str, Any]]

A list of sensors as dictionaries.

Source code in yooink/request/request_manager.py
151
152
153
154
155
156
157
158
159
160
161
162
163
def list_sensors(self, site: str, node: str) -> List[Dict[str, Any]]:
    """
    Lists sensors for a specific site and node.

    Args:
        site: The site identifier.
        node: The node identifier.

    Returns:
        List: A list of sensors as dictionaries.
    """
    endpoint = f"{site}/{node}/"
    return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

list_sites()

Lists all available sites from the API.

Returns:

Type Description
List[Dict[str, Any]]

A list of sites as dictionaries.

Source code in yooink/request/request_manager.py
128
129
130
131
132
133
134
135
136
def list_sites(self) -> List[Dict[str, Any]]:
    """
    Lists all available sites from the API.

    Returns:
        A list of sites as dictionaries.
    """
    endpoint = ""
    return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

list_streams(site, node, sensor, method)

Lists available streams for a specific data and method.

Parameters:

Name Type Description Default
site str

The site identifier.

required
node str

The node identifier.

required
sensor str

The data identifier.

required
method str

The method (e.g., telemetered).

required

Returns:

Type Description
List[Dict[str, Any]]

A list of streams as dictionaries.

Source code in yooink/request/request_manager.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def list_streams(
        self, site: str, node: str, sensor: str, method: str) \
        -> List[Dict[str, Any]]:
    """
    Lists available streams for a specific data and method.

    Args:
        site: The site identifier.
        node: The node identifier.
        sensor: The data identifier.
        method: The method (e.g., telemetered).

    Returns:
        A list of streams as dictionaries.
    """
    endpoint = f"{site}/{node}/{sensor}/{method}/"
    return self.api_client.make_request(M2MInterface.SENSOR_URL, endpoint)

load_cache_from_file()

Loads the cached URLs from a JSON file and removes expired entries. If the file is empty or contains invalid JSON, it initializes an empty cache.

Source code in yooink/request/request_manager.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def load_cache_from_file(self) -> None:
    """
    Loads the cached URLs from a JSON file and removes expired entries.
    If the file is empty or contains invalid JSON, it initializes an empty
    cache.
    """
    if not os.path.exists(self.CACHE_FILE):
        return

    try:
        with open(self.CACHE_FILE, 'r') as file:
            content = file.read().strip()

            if not content:  # Check if file is empty
                print("Cache file is empty. Initializing new cache.")
                file_cache = {}
            else:
                file_cache = json.loads(content)

        # Filter out expired cache entries
        current_time = time.time()
        valid_cache = {
            key: value for key, value in file_cache.items() if
            current_time - value['timestamp'] < self.cache_expiry * 86400
        }

        self.cached_urls = valid_cache
        self.save_cache_to_file()  # Save the updated cache

    except json.JSONDecodeError:
        print("Cache file contains invalid JSON. Initializing new cache.")
        self.cached_urls = {}
        self.save_cache_to_file()

save_cache_to_file()

Saves the current cached URLs to a JSON file, appending new URLs to the existing cache.

Source code in yooink/request/request_manager.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def save_cache_to_file(self) -> None:
    """
    Saves the current cached URLs to a JSON file, appending new URLs to the
    existing cache.
    """
    # Load existing cache if it exists
    file_cache = {}
    if os.path.exists(self.CACHE_FILE):
        try:
            with open(self.CACHE_FILE, 'r') as file:
                content = file.read().strip()
                if content:
                    file_cache = json.loads(content)
        except json.JSONDecodeError:
            print(
                "Existing cache file contains invalid JSON. "
                "Overwriting with new cache.")

    # Merge the in-memory cache with the file cache
    file_cache.update(self.cached_urls)

    # Write the merged cache to a temporary file, then replace the original
    # file
    temp_file = None
    try:
        temp_dir = os.path.dirname(self.CACHE_FILE)
        with tempfile.NamedTemporaryFile('w', dir=temp_dir,
                                         delete=False) as temp_file:
            json.dump(file_cache, temp_file)

        # Replace the original cache file with the temp file
        os.replace(temp_file.name, self.CACHE_FILE)

    except Exception as e:
        print(f"Error saving cache: {e}")

        # Ensure temp file is deleted if something goes wrong
        if temp_file:
            os.remove(temp_file.name)

wait_for_m2m_data(site, node, sensor, method, stream, begin_datetime, end_datetime)

Request data from the M2M API and wait for completion, displaying progress with tqdm.

Source code in yooink/request/request_manager.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def wait_for_m2m_data(
        self, site: str, node: str, sensor: str, method: str,
        stream: str, begin_datetime: str, end_datetime: str) -> Any | None:
    """
    Request data from the M2M API and wait for completion, displaying
    progress with tqdm.
    """
    # Step 1: Set up request details
    params = {
        'beginDT': begin_datetime, 'endDT': end_datetime,
        'format': 'application/netcdf', 'include_provenance': 'true',
        'include_annotations': 'true'}
    details = f"{site}/{node}/{sensor}/{method}/{stream}"

    # Step 2: Make the request and get the response
    response = self.api_client.make_request(M2MInterface.SENSOR_URL,
                                            details, params)

    if 'allURLs' not in response:
        print("No URLs found in the response.")
        return None

    # Step 3: Extract the async URL and status URL
    url = [url for url in response['allURLs'] if
           re.match(r'.*async_results.*', url)][0]
    thredds_url = response['allURLs'][0]
    check_complete = url + '/status.txt'

    # Step 4: Cache the URL immediately after the request is submitted
    cache_key = (f"{site}_{node}_{sensor}_{method}_{stream}_"
                 f"{begin_datetime}_{end_datetime}")
    self.cached_urls[cache_key] = {
        'tds_url': thredds_url,
        'async_url': url,
        'timestamp': time.time()
    }

    if self.use_file_cache:
        self.save_cache_to_file()  # Save cache immediately

    # Step 5: Use tqdm to wait for completion
    print(
        "Waiting for OOINet to process and prepare the data. This may "
        "take up to 20 minutes.")
    with tqdm(total=400, desc='Waiting', file=sys.stdout) as bar:
        for i in range(400):
            try:
                r = self.api_client.session.get(check_complete,
                                                timeout=(3.05, 120))
                if r.status_code == 200:  # Data is ready
                    bar.n = 400  # Complete the progress bar
                    return response
                elif r.status_code == 404:
                    pass
            except requests.exceptions.RequestException as e:
                print(f"Error during status check: {e}")

            bar.update()
            bar.refresh()
            time.sleep(3)  # Wait 3 seconds between checks

    # If we exit the loop without the request being ready, return None
    print("Data request timed out. Please try again later.")
    return None