Source code for gplay_scraper.utils.helpers

"""Helper functions for data processing and manipulation.

This module contains utility functions for:
- Text unescaping and cleaning
- JSON string cleaning
- Date parsing and calculations
- Install metrics calculations
"""

import re
import json
from html import unescape
from typing import Any, List, Optional, Dict
from datetime import datetime, timezone


[docs] def unescape_text(s: Optional[str]) -> Optional[str]: """Unescape HTML entities and remove HTML tags from text. Args: s: Input string with HTML Returns: Cleaned text without HTML tags """ if s is None: return None text = s.replace("<br>", "\n").replace("<br/>", "\n").replace("<br />", "\n") text = text.replace("<b>", "").replace("</b>", "") text = text.replace("<i>", "").replace("</i>", "") text = text.replace("<u>", "").replace("</u>", "") text = text.replace("<strong>", "").replace("</strong>", "") text = text.replace("<em>", "").replace("</em>", "") text = re.sub(r'<[^>]+>', '', text) return unescape(text).strip()
[docs] def clean_json_string(json_str: str) -> str: """Clean malformed JSON string from Google Play Store. Args: json_str: Raw JSON string Returns: Cleaned JSON string """ json_str = re.sub(r',\s*sideChannel:\s*\{\}', '', json_str) json_str = re.sub(r'([{,]\s*)([a-zA-Z_$][a-zA-Z0-9_$]*)\s*:', r'\1"\2":', json_str) json_str = re.sub(r'\bfunction\s*\([^)]*\)\s*\{[^}]*\}', 'null', json_str) json_str = re.sub(r'\bundefined\b', 'null', json_str) json_str = re.sub(r":\s*'([^']*)'", r': "\1"', json_str) json_str = re.sub(r'(\])\s*(\[)', r'\1,\2', json_str) json_str = re.sub(r'(\})\s*(\{)', r'\1,\2', json_str) json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) json_str = re.sub(r',,+', ',', json_str) json_str = re.sub(r':\s*\$([0-9.]+)', r': "$\1"', json_str) json_str = re.sub(r'"version"\s*:\s*([0-9.]+)(?=\s*[,}])', r'"version": "\1"', json_str) return json_str
[docs] def alternative_json_clean(json_str: str) -> str: """Alternative JSON cleaning method using bracket matching. Args: json_str: Raw JSON string Returns: Cleaned JSON string """ data_start = json_str.find('data:') if data_start != -1: bracket_start = json_str.find('[', data_start) if bracket_start != -1: bracket_count = 0 pos = bracket_start while pos < len(json_str): if json_str[pos] == '[': bracket_count += 1 elif json_str[pos] == ']': bracket_count -= 1 if bracket_count == 0: data_end = pos + 1 break pos += 1 if bracket_count == 0: data_array = json_str[bracket_start:data_end] try: parsed_array = json.loads(data_array) return json.dumps({ "key": "ds:5", "hash": "13", "data": parsed_array }) except json.JSONDecodeError: pass json_str = re.sub(r'\bNaN\b', 'null', json_str) return clean_json_string(json_str)
[docs] def parse_release_date(release_date_str: Optional[str]) -> Optional[datetime]: """Parse release date string to datetime object. Args: release_date_str: Date string in format 'Mon DD, YYYY' Returns: Datetime object or None if parsing fails """ if release_date_str is None: return None try: return datetime.strptime(release_date_str, "%b %d, %Y") except (ValueError, TypeError): return None
[docs] def calculate_app_age(release_date_str: Optional[str], current_date: datetime) -> Optional[int]: """Calculate app age in days since release. Args: release_date_str: Release date string current_date: Current date for calculation Returns: Number of days since release or None """ release_date = parse_release_date(release_date_str) if release_date is None: return None if current_date.tzinfo is not None and release_date.tzinfo is None: release_date = release_date.replace(tzinfo=timezone.utc) days_since_release = (current_date - release_date).days return max(0, days_since_release)
[docs] def parse_installs_string(installs_str: str) -> Optional[int]: """Parse install count string to integer. Args: installs_str: Install count string (e.g., '1,000,000+') Returns: Integer install count or None """ if installs_str is None: return None cleaned_str = installs_str.replace(',', '').replace('+', '') try: return int(cleaned_str) except (ValueError, TypeError): return None
[docs] def calculate_daily_installs(install_count, release_date_str: Optional[str], current_date: datetime) -> Optional[int]: """Calculate average daily installs since release. Args: install_count: Total install count release_date_str: Release date string current_date: Current date for calculation Returns: Average daily installs or None """ if isinstance(install_count, str): install_count = parse_installs_string(install_count) if install_count is None or release_date_str is None: return None release_date = parse_release_date(release_date_str) if release_date is None: return None if current_date.tzinfo is not None and release_date.tzinfo is None: release_date = release_date.replace(tzinfo=timezone.utc) days_since_release = (current_date - release_date).days if days_since_release <= 0: return 0 return int(install_count / days_since_release)
[docs] def calculate_monthly_installs(install_count, release_date_str: Optional[str], current_date: datetime) -> Optional[int]: """Calculate average monthly installs since release. Args: install_count: Total install count release_date_str: Release date string current_date: Current date for calculation Returns: Average monthly installs or None """ if isinstance(install_count, str): install_count = parse_installs_string(install_count) if install_count is None or release_date_str is None: return None release_date = parse_release_date(release_date_str) if release_date is None: return None if current_date.tzinfo is not None and release_date.tzinfo is None: release_date = release_date.replace(tzinfo=timezone.utc) days_since_release = (current_date - release_date).days if days_since_release <= 0: return 0 months_since_release = days_since_release / 30.44 return int(install_count / months_since_release)