zakat
xxx
_____ _ _ _ _ _
|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _
/ // _| |/ / _
| __| | | | | '_ \| '__/ _` | '__| | | |
/ /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| |
/______,_|_|___,_|__| |_____|_|_.__/|_| __,_|_| __, |
|___/
"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف ... Never Trust, Always Verify ...
This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat.
1""" 2 _____ _ _ _ _ _ 3|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _ 4 / // _` | |/ / _` | __| | | | | '_ \| '__/ _` | '__| | | | 5 / /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| | 6/____\__,_|_|\_\__,_|\__| |_____|_|_.__/|_| \__,_|_| \__, | 7 |___/ 8 9"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف 10... Never Trust, Always Verify ... 11 12This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat. 13""" 14# Importing necessary classes and functions from the main module 15from zakat.zakat_tracker import ( 16 ZakatTracker, 17 test, 18 Action, 19 JSONEncoder, 20 MathOperation, 21 WeekDay, 22) 23 24from zakat.file_server import ( 25 start_file_server, 26 find_available_port, 27 FileType, 28) 29 30# Version information for the module 31__version__ = ZakatTracker.Version() 32__all__ = [ 33 "ZakatTracker", 34 "test", 35 "Action", 36 "JSONEncoder", 37 "MathOperation", 38 "WeekDay", 39 "start_file_server", 40 "find_available_port", 41 "FileType", 42]
141class ZakatTracker: 142 """ 143 A class for tracking and calculating Zakat. 144 145 This class provides functionalities for recording transactions, calculating Zakat due, 146 and managing account balances. It also offers features like importing transactions from 147 CSV files, exporting data to JSON format, and saving/loading the tracker state. 148 149 The `ZakatTracker` class is designed to handle both positive and negative transactions, 150 allowing for flexible tracking of financial activities related to Zakat. It also supports 151 the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due 152 based on the current silver price. 153 154 The class uses a camel file as its database to persist the tracker state, 155 ensuring data integrity across sessions. It also provides options for enabling or 156 disabling history tracking, allowing users to choose their preferred level of detail. 157 158 In addition, the `ZakatTracker` class includes various helper methods like 159 `time`, `time_to_datetime`, `lock`, `free`, `recall`, `export_json`, 160 and more. These methods provide additional functionalities and flexibility 161 for interacting with and managing the Zakat tracker. 162 163 Attributes: 164 ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. 165 ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. 166 ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. 167 ZakatTracker.Version (function): The version of the ZakatTracker class. 168 169 Data Structure: 170 The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data. 171 172 _vault (dict): 173 - account (dict): 174 - {account_number} (dict): 175 - balance (int): The current balance of the account. 176 - box (dict): A dictionary storing transaction details. 177 - {timestamp} (dict): 178 - capital (int): The initial amount of the transaction. 179 - count (int): The number of times Zakat has been calculated for this transaction. 180 - last (int): The timestamp of the last Zakat calculation. 181 - rest (int): The remaining amount after Zakat deductions and withdrawal. 182 - total (int): The total Zakat deducted from this transaction. 183 - count (int): The total number of transactions for the account. 184 - log (dict): A dictionary storing transaction logs. 185 - {timestamp} (dict): 186 - value (int): The transaction amount (positive or negative). 187 - desc (str): The description of the transaction. 188 - ref (int): The box reference (positive or None). 189 - file (dict): A dictionary storing file references associated with the transaction. 190 - hide (bool): Indicates whether the account is hidden or not. 191 - zakatable (bool): Indicates whether the account is subject to Zakat. 192 - exchange (dict): 193 - account (dict): 194 - {timestamps} (dict): 195 - rate (float): Exchange rate when compared to local currency. 196 - description (str): The description of the exchange rate. 197 - history (dict): 198 - {timestamp} (list): A list of dictionaries storing the history of actions performed. 199 - {action_dict} (dict): 200 - action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT). 201 - account (str): The account number associated with the action. 202 - ref (int): The reference number of the transaction. 203 - file (int): The reference number of the file (if applicable). 204 - key (str): The key associated with the action (e.g., 'rest', 'total'). 205 - value (int): The value associated with the action. 206 - math (MathOperation): The mathematical operation performed (if applicable). 207 - lock (int or None): The timestamp indicating the current lock status (None if not locked). 208 - report (dict): 209 - {timestamp} (tuple): A tuple storing Zakat report details. 210 211 """ 212 213 @staticmethod 214 def Version() -> str: 215 """ 216 Returns the current version of the software. 217 218 This function returns a string representing the current version of the software, 219 including major, minor, and patch version numbers in the format "X.Y.Z". 220 221 Returns: 222 str: The current version of the software. 223 """ 224 return '0.2.96' 225 226 @staticmethod 227 def ZakatCut(x: float) -> float: 228 """ 229 Calculates the Zakat amount due on an asset. 230 231 This function calculates the zakat amount due on a given asset value over one lunar year. 232 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 233 that exceeds a certain threshold (Nisab). 234 235 Parameters: 236 x: The total value of the asset on which Zakat is to be calculated. 237 238 Returns: 239 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 240 """ 241 return 0.025 * x # Zakat Cut in one Lunar Year 242 243 @staticmethod 244 def TimeCycle(days: int = 355) -> int: 245 """ 246 Calculates the approximate duration of a lunar year in nanoseconds. 247 248 This function calculates the approximate duration of a lunar year based on the given number of days. 249 It converts the given number of days into nanoseconds for use in high-precision timing applications. 250 251 Parameters: 252 days: The number of days in a lunar year. Defaults to 355, 253 which is an approximation of the average length of a lunar year. 254 255 Returns: 256 The approximate duration of a lunar year in nanoseconds. 257 """ 258 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds 259 260 @staticmethod 261 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 262 """ 263 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 264 265 This function calculates the Nisab value, which is the minimum threshold of wealth, 266 that makes an individual liable for paying Zakat. 267 The Nisab value is determined by the equivalent value of a specific amount 268 of gold or silver (currently 595 grams in silver) in the local currency. 269 270 Parameters: 271 - gram_price (float): The price per gram of Nisab. 272 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 273 274 Returns: 275 - float: The total value of Nisab based on the given price per gram. 276 """ 277 return gram_price * gram_quantity 278 279 @staticmethod 280 def ext() -> str: 281 """ 282 Returns the file extension used by the ZakatTracker class. 283 284 Returns: 285 str: The file extension used by the ZakatTracker class, which is 'camel'. 286 """ 287 return 'camel' 288 289 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 290 """ 291 Initialize ZakatTracker with database path and history mode. 292 293 Parameters: 294 db_path (str): The path to the database file. Default is "zakat.camel". 295 history_mode (bool): The mode for tracking history. Default is True. 296 297 Returns: 298 None 299 """ 300 self._base_path = None 301 self._vault_path = None 302 self._vault = None 303 self.reset() 304 self._history(history_mode) 305 self.path(db_path) 306 307 def path(self, path: str = None) -> str: 308 """ 309 Set or get the path to the database file. 310 311 If no path is provided, the current path is returned. 312 If a path is provided, it is set as the new path. 313 The function also creates the necessary directories if the provided path is a file. 314 315 Parameters: 316 path (str): The new path to the database file. If not provided, the current path is returned. 317 318 Returns: 319 str: The current or new path to the database file. 320 """ 321 if path is None: 322 return self._vault_path 323 self._vault_path = Path(path).resolve() 324 base_path = Path(path).resolve() 325 if base_path.is_file() or base_path.suffix: 326 base_path = base_path.parent 327 base_path.mkdir(parents=True, exist_ok=True) 328 self._base_path = base_path 329 return str(self._vault_path) 330 331 def base_path(self, *args) -> str: 332 """ 333 Generate a base path by joining the provided arguments with the existing base path. 334 335 Parameters: 336 *args (str): Variable length argument list of strings to be joined with the base path. 337 338 Returns: 339 str: The generated base path. If no arguments are provided, the existing base path is returned. 340 """ 341 if not args: 342 return str(self._base_path) 343 filtered_args = [] 344 ignored_filename = None 345 for arg in args: 346 if Path(arg).suffix: 347 ignored_filename = arg 348 else: 349 filtered_args.append(arg) 350 base_path = Path(self._base_path) 351 full_path = base_path.joinpath(*filtered_args) 352 full_path.mkdir(parents=True, exist_ok=True) 353 if ignored_filename is not None: 354 return full_path.resolve() / ignored_filename # Join with the ignored filename 355 return str(full_path.resolve()) 356 357 @staticmethod 358 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 359 """ 360 Scales a numerical value by a specified power of 10, returning an integer. 361 362 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 363 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 364 365 Parameters: 366 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 367 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 368 by a factor of 100 (e.g., converts 1.23 to 123). 369 370 Returns: 371 The scaled value, rounded to the nearest integer. 372 373 Raises: 374 TypeError: If the input `x` is not a valid numeric type. 375 376 Examples: 377 >>> ZakatTracker.scale(3.14159) 378 314 379 >>> ZakatTracker.scale(1234, decimal_places=3) 380 1234000 381 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 382 50 383 """ 384 if not isinstance(x, (float, int, Decimal)): 385 raise TypeError("Input 'x' must be a float, int, or Decimal.") 386 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places)) 387 388 @staticmethod 389 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 390 """ 391 Unscales an integer by a power of 10. 392 393 Parameters: 394 x: The integer to unscale. 395 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 396 decimal_places: The power of 10 to use. Defaults to 2. 397 398 Returns: 399 The unscaled number, converted to the specified return_type. 400 401 Raises: 402 TypeError: If the return_type is not float or Decimal. 403 """ 404 if return_type not in (float, Decimal): 405 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 406 return round(return_type(x / (10 ** decimal_places)), decimal_places) 407 408 def _history(self, status: bool = None) -> bool: 409 """ 410 Enable or disable history tracking. 411 412 Parameters: 413 status (bool): The status of history tracking. Default is True. 414 415 Returns: 416 None 417 """ 418 if status is not None: 419 self._history_mode = status 420 return self._history_mode 421 422 def reset(self) -> None: 423 """ 424 Reset the internal data structure to its initial state. 425 426 Parameters: 427 None 428 429 Returns: 430 None 431 """ 432 self._vault = { 433 'account': {}, 434 'exchange': {}, 435 'history': {}, 436 'lock': None, 437 'report': {}, 438 } 439 440 _last_time_ns = None 441 _time_diff_ns = None 442 443 @staticmethod 444 def minimum_time_diff_ns() -> tuple[int, int]: 445 """ 446 Calculates the minimum time difference between two consecutive calls to 447 `ZakatTracker._time()` in nanoseconds. 448 449 This method is used internally to determine the minimum granularity of 450 time measurements within the system. 451 452 Returns: 453 tuple[int, int]: 454 - The minimum time difference in nanoseconds. 455 - The number of iterations required to measure the difference. 456 """ 457 i = 0 458 x = y = ZakatTracker._time() 459 while x == y: 460 y = ZakatTracker._time() 461 i += 1 462 return y - x, i 463 464 @staticmethod 465 def _time(now: datetime.datetime = None) -> int: 466 """ 467 Internal method to generate a nanosecond-precision timestamp from a datetime object. 468 469 Parameters: 470 now (datetime.datetime, optional): The datetime object to generate the timestamp from. 471 If not provided, the current datetime is used. 472 473 Returns: 474 int: The timestamp in nanoseconds since the epoch (January 1, 1AD). 475 """ 476 if now is None: 477 now = datetime.datetime.now() 478 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 479 return int(now.toordinal() * 86_400_000_000_000 + ns_in_day) 480 481 @staticmethod 482 def time(now: datetime.datetime = None) -> int: 483 """ 484 Generates a unique, monotonically increasing timestamp based on the provided 485 datetime object or the current datetime. 486 487 This method ensures that timestamps are unique even if called in rapid succession 488 by introducing a small delay if necessary, based on the system's minimum 489 time resolution. 490 491 Parameters: 492 now (datetime.datetime, optional): The datetime object to generate the timestamp from. 493 If not provided, the current datetime is used. 494 495 Returns: 496 int: The unique timestamp in nanoseconds since the epoch (January 1, 1AD). 497 """ 498 new_time = ZakatTracker._time(now) 499 if ZakatTracker._last_time_ns is None: 500 ZakatTracker._last_time_ns = new_time 501 return new_time 502 while new_time == ZakatTracker._last_time_ns: 503 if ZakatTracker._time_diff_ns is None: 504 diff, _ = ZakatTracker.minimum_time_diff_ns() 505 ZakatTracker._time_diff_ns = ceil(diff) 506 sleep(ZakatTracker._time_diff_ns / 1_000_000_000) 507 new_time = ZakatTracker._time() 508 ZakatTracker._last_time_ns = new_time 509 return new_time 510 511 @staticmethod 512 def time_to_datetime(ordinal_ns: int) -> datetime.datetime: 513 """ 514 Converts a nanosecond-precision timestamp (ordinal number of nanoseconds since 1AD) 515 back to a datetime object. 516 517 Parameters: 518 ordinal_ns (int): The timestamp in nanoseconds since the epoch (January 1, 1AD). 519 520 Returns: 521 datetime.datetime: The corresponding datetime object. 522 """ 523 d = datetime.datetime.fromordinal(ordinal_ns // 86_400_000_000_000) 524 t = datetime.timedelta(seconds=(ordinal_ns % 86_400_000_000_000) // 10 ** 9) 525 return datetime.datetime.combine(d, datetime.time()) + t 526 527 def clean_history(self, lock: int | None = None) -> int: 528 """ 529 Cleans up the history of actions performed on the ZakatTracker instance. 530 531 Parameters: 532 lock (int, optional): The lock ID is used to clean up the empty history. 533 If not provided, it cleans up the empty history records for all locks. 534 535 Returns: 536 int: The number of locks cleaned up. 537 """ 538 count = 0 539 if lock in self._vault['history']: 540 if len(self._vault['history'][lock]) <= 0: 541 count += 1 542 del self._vault['history'][lock] 543 return count 544 self.free(self.lock()) 545 for lock in self._vault['history']: 546 if len(self._vault['history'][lock]) <= 0: 547 count += 1 548 del self._vault['history'][lock] 549 return count 550 551 def _step(self, action: Action = None, account=None, ref: int = None, file: int = None, value: float = None, 552 key: str = None, math_operation: MathOperation = None) -> int: 553 """ 554 This method is responsible for recording the actions performed on the ZakatTracker. 555 556 Parameters: 557 - action (Action): The type of action performed. 558 - account (str): The account number on which the action was performed. 559 - ref (int): The reference number of the action. 560 - file (int): The file reference number of the action. 561 - value (int): The value associated with the action. 562 - key (str): The key associated with the action. 563 - math_operation (MathOperation): The mathematical operation performed during the action. 564 565 Returns: 566 - int: The lock time of the recorded action. If no lock was performed, it returns 0. 567 """ 568 if not self._history(): 569 return 0 570 lock = self._vault['lock'] 571 if self.nolock(): 572 lock = self._vault['lock'] = self.time() 573 self._vault['history'][lock] = [] 574 if action is None: 575 return lock 576 self._vault['history'][lock].append({ 577 'action': action, 578 'account': account, 579 'ref': ref, 580 'file': file, 581 'key': key, 582 'value': value, 583 'math': math_operation, 584 }) 585 return lock 586 587 def nolock(self) -> bool: 588 """ 589 Check if the vault lock is currently not set. 590 591 Returns: 592 bool: True if the vault lock is not set, False otherwise. 593 """ 594 return self._vault['lock'] is None 595 596 def lock(self) -> int: 597 """ 598 Acquires a lock on the ZakatTracker instance. 599 600 Returns: 601 int: The lock ID. This ID can be used to release the lock later. 602 """ 603 return self._step() 604 605 def vault(self) -> dict: 606 """ 607 Returns a copy of the internal vault dictionary. 608 609 This method is used to retrieve the current state of the ZakatTracker object. 610 It provides a snapshot of the internal data structure, allowing for further 611 processing or analysis. 612 613 Returns: 614 dict: A copy of the internal vault dictionary. 615 """ 616 return self._vault.copy() 617 618 def stats_init(self) -> dict[str, tuple[int, str]]: 619 """ 620 Initialize and return a dictionary containing initial statistics for the ZakatTracker instance. 621 622 The dictionary contains two keys: 'database' and 'ram'. Each key maps to a tuple containing two elements: 623 - The initial size of the respective statistic in bytes (int). 624 - The initial size of the respective statistic in a human-readable format (str). 625 626 Returns: 627 dict[str, tuple]: A dictionary with initial statistics for the ZakatTracker instance. 628 """ 629 return { 630 'database': (0, '0'), 631 'ram': (0, '0'), 632 } 633 634 def stats(self, ignore_ram: bool = True) -> dict[str, tuple[int, str]]: 635 """ 636 Calculates and returns statistics about the object's data storage. 637 638 This method determines the size of the database file on disk and the 639 size of the data currently held in RAM (likely within a dictionary). 640 Both sizes are reported in bytes and in a human-readable format 641 (e.g., KB, MB). 642 643 Parameters: 644 ignore_ram (bool): Whether to ignore the RAM size. Default is True 645 646 Returns: 647 dict[str, tuple]: A dictionary containing the following statistics: 648 649 * 'database': A tuple with two elements: 650 - The database file size in bytes (int). 651 - The database file size in human-readable format (str). 652 * 'ram': A tuple with two elements: 653 - The RAM usage (dictionary size) in bytes (int). 654 - The RAM usage in human-readable format (str). 655 656 Example: 657 >>> stats = my_object.stats() 658 >>> print(stats['database']) 659 (256000, '250.0 KB') 660 >>> print(stats['ram']) 661 (12345, '12.1 KB') 662 """ 663 ram_size = 0.0 if ignore_ram else self.get_dict_size(self.vault()) 664 file_size = os.path.getsize(self.path()) 665 return { 666 'database': (file_size, self.human_readable_size(file_size)), 667 'ram': (ram_size, self.human_readable_size(ram_size)), 668 } 669 670 def files(self) -> list[dict[str, str | int]]: 671 """ 672 Retrieves information about files associated with this class. 673 674 This class method provides a standardized way to gather details about 675 files used by the class for storage, snapshots, and CSV imports. 676 677 Returns: 678 list[dict[str, str | int]]: A list of dictionaries, each containing information 679 about a specific file: 680 681 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 682 * path (str): The full file path. 683 * exists (bool): Whether the file exists on the filesystem. 684 * size (int): The file size in bytes (0 if the file doesn't exist). 685 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 686 687 Example: 688 ``` 689 file_info = MyClass.files() 690 for info in file_info: 691 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 692 ``` 693 """ 694 result = [] 695 for file_type, path in { 696 'database': self.path(), 697 'snapshot': self.snapshot_cache_path(), 698 'import_csv': self.import_csv_cache_path(), 699 }.items(): 700 exists = os.path.exists(path) 701 size = os.path.getsize(path) if exists else 0 702 human_readable_size = self.human_readable_size(size) if exists else 0 703 result.append({ 704 'type': file_type, 705 'path': path, 706 'exists': exists, 707 'size': size, 708 'human_readable_size': human_readable_size, 709 }) 710 return result 711 712 def steps(self) -> dict: 713 """ 714 Returns a copy of the history of steps taken in the ZakatTracker. 715 716 The history is a dictionary where each key is a unique identifier for a step, 717 and the corresponding value is a dictionary containing information about the step. 718 719 Returns: 720 dict: A copy of the history of steps taken in the ZakatTracker. 721 """ 722 return self._vault['history'].copy() 723 724 def free(self, lock: int, auto_save: bool = True) -> bool: 725 """ 726 Releases the lock on the database. 727 728 Parameters: 729 lock (int): The lock ID to be released. 730 auto_save (bool): Whether to automatically save the database after releasing the lock. 731 732 Returns: 733 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 734 """ 735 if lock == self._vault['lock']: 736 self._vault['lock'] = None 737 self.clean_history(lock) 738 if auto_save: 739 return self.save(self.path()) 740 return True 741 return False 742 743 def account_exists(self, account) -> bool: 744 """ 745 Check if the given account exists in the vault. 746 747 Parameters: 748 account (str): The account number to check. 749 750 Returns: 751 bool: True if the account exists, False otherwise. 752 """ 753 return account in self._vault['account'] 754 755 def box_size(self, account) -> int: 756 """ 757 Calculate the size of the box for a specific account. 758 759 Parameters: 760 account (str): The account number for which the box size needs to be calculated. 761 762 Returns: 763 int: The size of the box for the given account. If the account does not exist, -1 is returned. 764 """ 765 if self.account_exists(account): 766 return len(self._vault['account'][account]['box']) 767 return -1 768 769 def log_size(self, account) -> int: 770 """ 771 Get the size of the log for a specific account. 772 773 Parameters: 774 account (str): The account number for which the log size needs to be calculated. 775 776 Returns: 777 int: The size of the log for the given account. If the account does not exist, -1 is returned. 778 """ 779 if self.account_exists(account): 780 return len(self._vault['account'][account]['log']) 781 return -1 782 783 @staticmethod 784 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 785 """ 786 Calculates the hash of a file using the specified algorithm. 787 788 Parameters: 789 file_path (str): The path to the file. 790 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 791 792 Returns: 793 str: The hexadecimal representation of the file's hash. 794 """ 795 hash_obj = hashlib.new(algorithm) # Create the hash object 796 with open(file_path, "rb") as f: # Open file in binary mode for reading 797 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 798 hash_obj.update(chunk) 799 return hash_obj.hexdigest() # Return the hash as a hexadecimal string 800 801 def snapshot_cache_path(self): 802 """ 803 Generate the path for the cache file used to store snapshots. 804 805 The cache file is a camel file that stores the timestamps of the snapshots. 806 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 807 808 Returns: 809 str: The path to the cache file. 810 """ 811 path = str(self.path()) 812 ext = self.ext() 813 ext_len = len(ext) 814 if path.endswith(f'.{ext}'): 815 path = path[:-ext_len - 1] 816 _, filename = os.path.split(path + f'.snapshots.{ext}') 817 return self.base_path(filename) 818 819 def snapshot(self) -> bool: 820 """ 821 This function creates a snapshot of the current database state. 822 823 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 824 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 825 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 826 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 827 828 Parameters: 829 None 830 831 Returns: 832 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 833 """ 834 current_hash = self.file_hash(self.path()) 835 cache: dict[str, int] = {} # hash: time_ns 836 try: 837 with open(self.snapshot_cache_path(), 'r') as stream: 838 cache = camel.load(stream.read()) 839 except: 840 pass 841 if current_hash in cache: 842 return True 843 time = time_ns() 844 cache[current_hash] = time 845 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 846 return False 847 with open(self.snapshot_cache_path(), 'w') as stream: 848 stream.write(camel.dump(cache)) 849 return True 850 851 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 852 -> dict[int, tuple[str, str, bool]]: 853 """ 854 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 855 856 Parameters: 857 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 858 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 859 860 Returns: 861 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 862 and the values are tuples containing the snapshot's hash, path, and existence status. 863 """ 864 cache: dict[str, int] = {} # hash: time_ns 865 try: 866 with open(self.snapshot_cache_path(), 'r') as stream: 867 cache = camel.load(stream.read()) 868 except: 869 pass 870 if not cache: 871 return {} 872 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 873 for file_hash, ref in cache.items(): 874 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 875 exists = os.path.exists(path) 876 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 877 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 878 continue 879 if exists or not hide_missing: 880 result[ref] = (file_hash, path, exists) 881 return result 882 883 def recall(self, dry=True, debug=False) -> bool: 884 """ 885 Revert the last operation. 886 887 Parameters: 888 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 889 debug (bool): If True, the function will print debug information. Default is False. 890 891 Returns: 892 bool: True if the operation was successful, False otherwise. 893 """ 894 if not self.nolock() or len(self._vault['history']) == 0: 895 return False 896 if len(self._vault['history']) <= 0: 897 return False 898 ref = sorted(self._vault['history'].keys())[-1] 899 if debug: 900 print('recall', ref) 901 memory = self._vault['history'][ref] 902 if debug: 903 print(type(memory), 'memory', memory) 904 limit = len(memory) + 1 905 sub_positive_log_negative = 0 906 for i in range(-1, -limit, -1): 907 x = memory[i] 908 if debug: 909 print(type(x), x) 910 match x['action']: 911 case Action.CREATE: 912 if x['account'] is not None: 913 if self.account_exists(x['account']): 914 if debug: 915 print('account', self._vault['account'][x['account']]) 916 assert len(self._vault['account'][x['account']]['box']) == 0 917 assert self._vault['account'][x['account']]['balance'] == 0 918 assert self._vault['account'][x['account']]['count'] == 0 919 if dry: 920 continue 921 del self._vault['account'][x['account']] 922 923 case Action.TRACK: 924 if x['account'] is not None: 925 if self.account_exists(x['account']): 926 if dry: 927 continue 928 self._vault['account'][x['account']]['balance'] -= x['value'] 929 self._vault['account'][x['account']]['count'] -= 1 930 del self._vault['account'][x['account']]['box'][x['ref']] 931 932 case Action.LOG: 933 if x['account'] is not None: 934 if self.account_exists(x['account']): 935 if x['ref'] in self._vault['account'][x['account']]['log']: 936 if dry: 937 continue 938 if sub_positive_log_negative == -x['value']: 939 self._vault['account'][x['account']]['count'] -= 1 940 sub_positive_log_negative = 0 941 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 942 if not box_ref is None: 943 assert self.box_exists(x['account'], box_ref) 944 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 945 assert box_value < 0 946 947 try: 948 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 949 except TypeError: 950 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 951 -box_value) 952 953 try: 954 self._vault['account'][x['account']]['balance'] += -box_value 955 except TypeError: 956 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 957 958 self._vault['account'][x['account']]['count'] -= 1 959 del self._vault['account'][x['account']]['log'][x['ref']] 960 961 case Action.SUB: 962 if x['account'] is not None: 963 if self.account_exists(x['account']): 964 if x['ref'] in self._vault['account'][x['account']]['box']: 965 if dry: 966 continue 967 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 968 self._vault['account'][x['account']]['balance'] += x['value'] 969 sub_positive_log_negative = x['value'] 970 971 case Action.ADD_FILE: 972 if x['account'] is not None: 973 if self.account_exists(x['account']): 974 if x['ref'] in self._vault['account'][x['account']]['log']: 975 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 976 if dry: 977 continue 978 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 979 980 case Action.REMOVE_FILE: 981 if x['account'] is not None: 982 if self.account_exists(x['account']): 983 if x['ref'] in self._vault['account'][x['account']]['log']: 984 if dry: 985 continue 986 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 987 988 case Action.BOX_TRANSFER: 989 if x['account'] is not None: 990 if self.account_exists(x['account']): 991 if x['ref'] in self._vault['account'][x['account']]['box']: 992 if dry: 993 continue 994 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 995 996 case Action.EXCHANGE: 997 if x['account'] is not None: 998 if x['account'] in self._vault['exchange']: 999 if x['ref'] in self._vault['exchange'][x['account']]: 1000 if dry: 1001 continue 1002 del self._vault['exchange'][x['account']][x['ref']] 1003 1004 case Action.REPORT: 1005 if x['ref'] in self._vault['report']: 1006 if dry: 1007 continue 1008 del self._vault['report'][x['ref']] 1009 1010 case Action.ZAKAT: 1011 if x['account'] is not None: 1012 if self.account_exists(x['account']): 1013 if x['ref'] in self._vault['account'][x['account']]['box']: 1014 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 1015 if dry: 1016 continue 1017 match x['math']: 1018 case MathOperation.ADDITION: 1019 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 1020 'value'] 1021 case MathOperation.EQUAL: 1022 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 1023 case MathOperation.SUBTRACTION: 1024 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 1025 'value'] 1026 1027 if not dry: 1028 del self._vault['history'][ref] 1029 return True 1030 1031 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 1032 """ 1033 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 1034 1035 Parameters: 1036 account (str): The account number for which to check the existence of the reference. 1037 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 1038 ref (int): The reference (transaction) number to check for existence. 1039 1040 Returns: 1041 bool: True if the reference exists for the given account and reference type, False otherwise. 1042 """ 1043 if account in self._vault['account']: 1044 return ref in self._vault['account'][account][ref_type] 1045 return False 1046 1047 def box_exists(self, account: str, ref: int) -> bool: 1048 """ 1049 Check if a specific box (transaction) exists in the vault for a given account and reference. 1050 1051 Parameters: 1052 - account (str): The account number for which to check the existence of the box. 1053 - ref (int): The reference (transaction) number to check for existence. 1054 1055 Returns: 1056 - bool: True if the box exists for the given account and reference, False otherwise. 1057 """ 1058 return self.ref_exists(account, 'box', ref) 1059 1060 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 1061 created: int = None, 1062 debug: bool = False) -> int: 1063 """ 1064 This function tracks a transaction for a specific account. 1065 1066 Parameters: 1067 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 1068 desc (str): The description of the transaction. Default is an empty string. 1069 account (str): The account for which the transaction is being tracked. Default is '1'. 1070 logging (bool): Whether to log the transaction. Default is True. 1071 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 1072 debug (bool): Whether to print debug information. Default is False. 1073 1074 Returns: 1075 int: The timestamp of the transaction. 1076 1077 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 1078 1079 Raises: 1080 ValueError: The log transaction happened again in the same nanosecond time. 1081 ValueError: The box transaction happened again in the same nanosecond time. 1082 """ 1083 if debug: 1084 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 1085 if created is None: 1086 created = self.time() 1087 no_lock = self.nolock() 1088 self.lock() 1089 if not self.account_exists(account): 1090 if debug: 1091 print(f"account {account} created") 1092 self._vault['account'][account] = { 1093 'balance': 0, 1094 'box': {}, 1095 'count': 0, 1096 'log': {}, 1097 'hide': False, 1098 'zakatable': True, 1099 } 1100 self._step(Action.CREATE, account) 1101 if unscaled_value == 0: 1102 if no_lock: 1103 self.free(self.lock()) 1104 return 0 1105 value = self.scale(unscaled_value) 1106 if logging: 1107 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1108 if debug: 1109 print('create-box', created) 1110 if self.box_exists(account, created): 1111 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1112 if debug: 1113 print('created-box', created) 1114 self._vault['account'][account]['box'][created] = { 1115 'capital': value, 1116 'count': 0, 1117 'last': 0, 1118 'rest': value, 1119 'total': 0, 1120 } 1121 self._step(Action.TRACK, account, ref=created, value=value) 1122 if no_lock: 1123 self.free(self.lock()) 1124 return created 1125 1126 def log_exists(self, account: str, ref: int) -> bool: 1127 """ 1128 Checks if a specific transaction log entry exists for a given account. 1129 1130 Parameters: 1131 account (str): The account number associated with the transaction log. 1132 ref (int): The reference to the transaction log entry. 1133 1134 Returns: 1135 bool: True if the transaction log entry exists, False otherwise. 1136 """ 1137 return self.ref_exists(account, 'log', ref) 1138 1139 def _log(self, value: float, desc: str = '', account: str = 1, created: int = None, ref: int = None, 1140 debug: bool = False) -> int: 1141 """ 1142 Log a transaction into the account's log. 1143 1144 Parameters: 1145 value (float): The value of the transaction. 1146 desc (str): The description of the transaction. 1147 account (str): The account to log the transaction into. Default is '1'. 1148 created (int): The timestamp of the transaction. If not provided, it will be generated. 1149 ref (int): The reference of the object. 1150 debug (bool): Whether to print debug information. Default is False. 1151 1152 Returns: 1153 int: The timestamp of the logged transaction. 1154 1155 This method updates the account's balance, count, and log with the transaction details. 1156 It also creates a step in the history of the transaction. 1157 1158 Raises: 1159 ValueError: The log transaction happened again in the same nanosecond time. 1160 """ 1161 if debug: 1162 print('_log', f'debug={debug}') 1163 if created is None: 1164 created = self.time() 1165 try: 1166 self._vault['account'][account]['balance'] += value 1167 except TypeError: 1168 self._vault['account'][account]['balance'] += Decimal(value) 1169 self._vault['account'][account]['count'] += 1 1170 if debug: 1171 print('create-log', created) 1172 if self.log_exists(account, created): 1173 raise ValueError(f"The log transaction happened again in the same nanosecond time({created}).") 1174 if debug: 1175 print('created-log', created) 1176 self._vault['account'][account]['log'][created] = { 1177 'value': value, 1178 'desc': desc, 1179 'ref': ref, 1180 'file': {}, 1181 } 1182 self._step(Action.LOG, account, ref=created, value=value) 1183 return created 1184 1185 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1186 debug: bool = False) -> dict: 1187 """ 1188 This method is used to record or retrieve exchange rates for a specific account. 1189 1190 Parameters: 1191 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1192 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1193 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1194 - description (str): A description of the exchange rate. 1195 - debug (bool): Whether to print debug information. Default is False. 1196 1197 Returns: 1198 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1199 it returns a dictionary with default values for the rate and description. 1200 """ 1201 if debug: 1202 print('exchange', f'debug={debug}') 1203 if created is None: 1204 created = self.time() 1205 no_lock = self.nolock() 1206 self.lock() 1207 if rate is not None: 1208 if rate <= 0: 1209 return dict() 1210 if account not in self._vault['exchange']: 1211 self._vault['exchange'][account] = {} 1212 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1213 return {"time": created, "rate": 1, "description": None} 1214 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1215 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1216 if no_lock: 1217 self.free(self.lock()) 1218 if debug: 1219 print("exchange-created-1", 1220 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1221 1222 if account in self._vault['exchange']: 1223 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1224 if valid_rates: 1225 latest_rate = max(valid_rates, key=lambda x: x[0]) 1226 if debug: 1227 print("exchange-read-1", 1228 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1229 'latest_rate', latest_rate) 1230 result = latest_rate[1] 1231 result['time'] = latest_rate[0] 1232 return result # إرجاع قاموس يحتوي على المعدل والوصف 1233 if debug: 1234 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1235 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ 1236 1237 @staticmethod 1238 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1239 """ 1240 This function calculates the exchanged amount of a currency. 1241 1242 Args: 1243 x (float): The original amount of the currency. 1244 x_rate (float): The exchange rate of the original currency. 1245 y_rate (float): The exchange rate of the target currency. 1246 1247 Returns: 1248 float: The exchanged amount of the target currency. 1249 """ 1250 return (x * x_rate) / y_rate 1251 1252 def exchanges(self) -> dict: 1253 """ 1254 Retrieve the recorded exchange rates for all accounts. 1255 1256 Parameters: 1257 None 1258 1259 Returns: 1260 dict: A dictionary containing all recorded exchange rates. 1261 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1262 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1263 """ 1264 return self._vault['exchange'].copy() 1265 1266 def accounts(self) -> dict: 1267 """ 1268 Returns a dictionary containing account numbers as keys and their respective balances as values. 1269 1270 Parameters: 1271 None 1272 1273 Returns: 1274 dict: A dictionary where keys are account numbers and values are their respective balances. 1275 """ 1276 result = {} 1277 for i in self._vault['account']: 1278 result[i] = self._vault['account'][i]['balance'] 1279 return result 1280 1281 def boxes(self, account) -> dict: 1282 """ 1283 Retrieve the boxes (transactions) associated with a specific account. 1284 1285 Parameters: 1286 account (str): The account number for which to retrieve the boxes. 1287 1288 Returns: 1289 dict: A dictionary containing the boxes associated with the given account. 1290 If the account does not exist, an empty dictionary is returned. 1291 """ 1292 if self.account_exists(account): 1293 return self._vault['account'][account]['box'] 1294 return {} 1295 1296 def logs(self, account) -> dict: 1297 """ 1298 Retrieve the logs (transactions) associated with a specific account. 1299 1300 Parameters: 1301 account (str): The account number for which to retrieve the logs. 1302 1303 Returns: 1304 dict: A dictionary containing the logs associated with the given account. 1305 If the account does not exist, an empty dictionary is returned. 1306 """ 1307 if self.account_exists(account): 1308 return self._vault['account'][account]['log'] 1309 return {} 1310 1311 def daily_logs_init(self) -> dict[str, dict]: 1312 """ 1313 Initialize a dictionary to store daily, weekly, monthly, and yearly logs. 1314 1315 Returns: 1316 dict: A dictionary with keys 'daily', 'weekly', 'monthly', and 'yearly', each containing an empty dictionary. 1317 Later each key maps to another dictionary, which will store the logs for the corresponding time period. 1318 """ 1319 return { 1320 'daily': {}, 1321 'weekly': {}, 1322 'monthly': {}, 1323 'yearly': {}, 1324 } 1325 1326 def daily_logs(self, weekday: WeekDay = WeekDay.Friday, debug: bool = False): 1327 """ 1328 Retrieve the daily logs (transactions) from all accounts. 1329 1330 The function groups the logs by day, month, and year, and calculates the total value for each group. 1331 It returns a dictionary where the keys are the timestamps of the daily groups, 1332 and the values are dictionaries containing the total value and the logs for that group. 1333 1334 Parameters: 1335 weekday (WeekDay): Select the weekday is collected for the week data. Default is WeekDay.Friday. 1336 debug (bool): Whether to print debug information. Default is False. 1337 1338 Returns: 1339 dict: A dictionary containing the daily logs. 1340 1341 Example: 1342 >>> tracker = ZakatTracker() 1343 >>> tracker.sub(51, 'desc', 'account1') 1344 >>> ref = tracker.track(100, 'desc', 'account2') 1345 >>> tracker.add_file('account2', ref, 'file_0') 1346 >>> tracker.add_file('account2', ref, 'file_1') 1347 >>> tracker.add_file('account2', ref, 'file_2') 1348 >>> tracker.daily_logs() 1349 { 1350 'daily': { 1351 '2024-06-30': { 1352 'positive': 100, 1353 'negative': 51, 1354 'total': 99, 1355 'rows': [ 1356 { 1357 'account': 'account1', 1358 'desc': 'desc', 1359 'file': {}, 1360 'ref': None, 1361 'value': -51, 1362 'time': 1690977015000000000, 1363 'transfer': False, 1364 }, 1365 { 1366 'account': 'account2', 1367 'desc': 'desc', 1368 'file': { 1369 1722919011626770944: 'file_0', 1370 1722919011626812928: 'file_1', 1371 1722919011626846976: 'file_2', 1372 }, 1373 'ref': None, 1374 'value': 100, 1375 'time': 1690977015000000000, 1376 'transfer': False, 1377 }, 1378 ], 1379 }, 1380 }, 1381 'weekly': { 1382 datetime: { 1383 'positive': 100, 1384 'negative': 51, 1385 'total': 99, 1386 }, 1387 }, 1388 'monthly': { 1389 '2024-06': { 1390 'positive': 100, 1391 'negative': 51, 1392 'total': 99, 1393 }, 1394 }, 1395 'yearly': { 1396 2024: { 1397 'positive': 100, 1398 'negative': 51, 1399 'total': 99, 1400 }, 1401 }, 1402 } 1403 """ 1404 logs = {} 1405 for account in self.accounts(): 1406 for k, v in self.logs(account).items(): 1407 v['time'] = k 1408 v['account'] = account 1409 if k not in logs: 1410 logs[k] = [] 1411 logs[k].append(v) 1412 if debug: 1413 print('logs', logs) 1414 y = self.daily_logs_init() 1415 for i in sorted(logs, reverse=True): 1416 dt = self.time_to_datetime(i) 1417 daily = f'{dt.year}-{dt.month:02d}-{dt.day:02d}' 1418 weekly = dt - timedelta(days=weekday.value) 1419 monthly = f'{dt.year}-{dt.month:02d}' 1420 yearly = dt.year 1421 # daily 1422 if daily not in y['daily']: 1423 y['daily'][daily] = { 1424 'positive': 0, 1425 'negative': 0, 1426 'total': 0, 1427 'rows': [], 1428 } 1429 transfer = len(logs[i]) > 1 1430 if debug: 1431 print('logs[i]', logs[i]) 1432 for z in logs[i]: 1433 if debug: 1434 print('z', z) 1435 # daily 1436 value = z['value'] 1437 if value > 0: 1438 y['daily'][daily]['positive'] += value 1439 else: 1440 y['daily'][daily]['negative'] += -value 1441 y['daily'][daily]['total'] += value 1442 z['transfer'] = transfer 1443 y['daily'][daily]['rows'].append(z) 1444 # weekly 1445 if weekly not in y['weekly']: 1446 y['weekly'][weekly] = { 1447 'positive': 0, 1448 'negative': 0, 1449 'total': 0, 1450 } 1451 if value > 0: 1452 y['weekly'][weekly]['positive'] += value 1453 else: 1454 y['weekly'][weekly]['negative'] += -value 1455 y['weekly'][weekly]['total'] += value 1456 # monthly 1457 if monthly not in y['monthly']: 1458 y['monthly'][monthly] = { 1459 'positive': 0, 1460 'negative': 0, 1461 'total': 0, 1462 } 1463 if value > 0: 1464 y['monthly'][monthly]['positive'] += value 1465 else: 1466 y['monthly'][monthly]['negative'] += -value 1467 y['monthly'][monthly]['total'] += value 1468 # yearly 1469 if yearly not in y['yearly']: 1470 y['yearly'][yearly] = { 1471 'positive': 0, 1472 'negative': 0, 1473 'total': 0, 1474 } 1475 if value > 0: 1476 y['yearly'][yearly]['positive'] += value 1477 else: 1478 y['yearly'][yearly]['negative'] += -value 1479 y['yearly'][yearly]['total'] += value 1480 if debug: 1481 print('y', y) 1482 return y 1483 1484 def add_file(self, account: str, ref: int, path: str) -> int: 1485 """ 1486 Adds a file reference to a specific transaction log entry in the vault. 1487 1488 Parameters: 1489 account (str): The account number associated with the transaction log. 1490 ref (int): The reference to the transaction log entry. 1491 path (str): The path of the file to be added. 1492 1493 Returns: 1494 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1495 """ 1496 if self.account_exists(account): 1497 if ref in self._vault['account'][account]['log']: 1498 file_ref = self.time() 1499 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1500 no_lock = self.nolock() 1501 self.lock() 1502 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1503 if no_lock: 1504 self.free(self.lock()) 1505 return file_ref 1506 return 0 1507 1508 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1509 """ 1510 Removes a file reference from a specific transaction log entry in the vault. 1511 1512 Parameters: 1513 account (str): The account number associated with the transaction log. 1514 ref (int): The reference to the transaction log entry. 1515 file_ref (int): The reference of the file to be removed. 1516 1517 Returns: 1518 bool: True if the file reference is successfully removed, False otherwise. 1519 """ 1520 if self.account_exists(account): 1521 if ref in self._vault['account'][account]['log']: 1522 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1523 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1524 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1525 no_lock = self.nolock() 1526 self.lock() 1527 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1528 if no_lock: 1529 self.free(self.lock()) 1530 return True 1531 return False 1532 1533 def balance(self, account: str = 1, cached: bool = True) -> int: 1534 """ 1535 Calculate and return the balance of a specific account. 1536 1537 Parameters: 1538 account (str): The account number. Default is '1'. 1539 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1540 1541 Returns: 1542 int: The balance of the account. 1543 1544 Note: 1545 If cached is True, the function returns the cached balance. 1546 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1547 """ 1548 if cached: 1549 return self._vault['account'][account]['balance'] 1550 x = 0 1551 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1] 1552 1553 def hide(self, account, status: bool = None) -> bool: 1554 """ 1555 Check or set the hide status of a specific account. 1556 1557 Parameters: 1558 account (str): The account number. 1559 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1560 1561 Returns: 1562 bool: The current or updated hide status of the account. 1563 1564 Raises: 1565 None 1566 1567 Example: 1568 >>> tracker = ZakatTracker() 1569 >>> ref = tracker.track(51, 'desc', 'account1') 1570 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1571 False 1572 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1573 True 1574 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1575 True 1576 >>> tracker.hide('account1', False) 1577 False 1578 """ 1579 if self.account_exists(account): 1580 if status is None: 1581 return self._vault['account'][account]['hide'] 1582 self._vault['account'][account]['hide'] = status 1583 return status 1584 return False 1585 1586 def zakatable(self, account, status: bool = None) -> bool: 1587 """ 1588 Check or set the zakatable status of a specific account. 1589 1590 Parameters: 1591 account (str): The account number. 1592 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1593 1594 Returns: 1595 bool: The current or updated zakatable status of the account. 1596 1597 Raises: 1598 None 1599 1600 Example: 1601 >>> tracker = ZakatTracker() 1602 >>> ref = tracker.track(51, 'desc', 'account1') 1603 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1604 True 1605 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1606 True 1607 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1608 True 1609 >>> tracker.zakatable('account1', False) 1610 False 1611 """ 1612 if self.account_exists(account): 1613 if status is None: 1614 return self._vault['account'][account]['zakatable'] 1615 self._vault['account'][account]['zakatable'] = status 1616 return status 1617 return False 1618 1619 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1620 debug: bool = False) \ 1621 -> tuple[ 1622 int, 1623 list[ 1624 tuple[int, int], 1625 ], 1626 ] | tuple: 1627 """ 1628 Subtracts a specified value from an account's balance. 1629 1630 Parameters: 1631 unscaled_value (float | int | Decimal): The amount to be subtracted. 1632 desc (str): A description for the transaction. Defaults to an empty string. 1633 account (str): The account from which the value will be subtracted. Defaults to '1'. 1634 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1635 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1636 1637 Returns: 1638 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1639 1640 If the amount to subtract is greater than the account's balance, 1641 the remaining amount will be transferred to a new transaction with a negative value. 1642 1643 Raises: 1644 ValueError: The box transaction happened again in the same nanosecond time. 1645 ValueError: The log transaction happened again in the same nanosecond time. 1646 """ 1647 if debug: 1648 print('sub', f'debug={debug}') 1649 if unscaled_value < 0: 1650 return tuple() 1651 if unscaled_value == 0: 1652 ref = self.track(unscaled_value, '', account) 1653 return ref, ref 1654 if created is None: 1655 created = self.time() 1656 no_lock = self.nolock() 1657 self.lock() 1658 self.track(0, '', account) 1659 value = self.scale(unscaled_value) 1660 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1661 ids = sorted(self._vault['account'][account]['box'].keys()) 1662 limit = len(ids) + 1 1663 target = value 1664 if debug: 1665 print('ids', ids) 1666 ages = [] 1667 for i in range(-1, -limit, -1): 1668 if target == 0: 1669 break 1670 j = ids[i] 1671 if debug: 1672 print('i', i, 'j', j) 1673 rest = self._vault['account'][account]['box'][j]['rest'] 1674 if rest >= target: 1675 self._vault['account'][account]['box'][j]['rest'] -= target 1676 self._step(Action.SUB, account, ref=j, value=target) 1677 ages.append((j, target)) 1678 target = 0 1679 break 1680 elif target > rest > 0: 1681 chunk = rest 1682 target -= chunk 1683 self._step(Action.SUB, account, ref=j, value=chunk) 1684 ages.append((j, chunk)) 1685 self._vault['account'][account]['box'][j]['rest'] = 0 1686 if target > 0: 1687 self.track( 1688 unscaled_value=self.unscale(-target), 1689 desc=desc, 1690 account=account, 1691 logging=False, 1692 created=created, 1693 ) 1694 ages.append((created, target)) 1695 if no_lock: 1696 self.free(self.lock()) 1697 return created, ages 1698 1699 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1700 created: int = None, 1701 debug: bool = False) -> list[int]: 1702 """ 1703 Transfers a specified value from one account to another. 1704 1705 Parameters: 1706 unscaled_amount (float | int | Decimal): The amount to be transferred. 1707 from_account (str): The account from which the value will be transferred. 1708 to_account (str): The account to which the value will be transferred. 1709 desc (str, optional): A description for the transaction. Defaults to an empty string. 1710 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1711 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1712 1713 Returns: 1714 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1715 1716 Raises: 1717 ValueError: Transfer to the same account is forbidden. 1718 ValueError: The box transaction happened again in the same nanosecond time. 1719 ValueError: The log transaction happened again in the same nanosecond time. 1720 """ 1721 if debug: 1722 print('transfer', f'debug={debug}') 1723 if from_account == to_account: 1724 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1725 if unscaled_amount <= 0: 1726 return [] 1727 if created is None: 1728 created = self.time() 1729 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1730 times = [] 1731 source_exchange = self.exchange(from_account, created) 1732 target_exchange = self.exchange(to_account, created) 1733 1734 if debug: 1735 print('ages', ages) 1736 1737 for age, value in ages: 1738 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1739 if debug: 1740 print('target_amount', target_amount) 1741 # Perform the transfer 1742 if self.box_exists(to_account, age): 1743 if debug: 1744 print('box_exists', age) 1745 capital = self._vault['account'][to_account]['box'][age]['capital'] 1746 rest = self._vault['account'][to_account]['box'][age]['rest'] 1747 if debug: 1748 print( 1749 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1750 selected_age = age 1751 if rest + target_amount > capital: 1752 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1753 selected_age = ZakatTracker.time() 1754 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1755 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1756 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1757 created=None, ref=None, debug=debug) 1758 times.append((age, y)) 1759 continue 1760 if debug: 1761 print( 1762 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1763 y = self.track( 1764 unscaled_value=self.unscale(int(target_amount)), 1765 desc=desc, 1766 account=to_account, 1767 logging=True, 1768 created=age, 1769 debug=debug, 1770 ) 1771 times.append(y) 1772 return times 1773 1774 def check(self, 1775 silver_gram_price: float, 1776 unscaled_nisab: float | int | Decimal = None, 1777 debug: bool = False, 1778 now: int = None, 1779 cycle: float = None) -> tuple: 1780 """ 1781 Check the eligibility for Zakat based on the given parameters. 1782 1783 Parameters: 1784 silver_gram_price (float): The price of a gram of silver. 1785 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1786 it will be calculated based on the silver_gram_price. 1787 debug (bool): Flag to enable debug mode. 1788 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1789 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1790 1791 Returns: 1792 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1793 and a dictionary containing the Zakat plan. 1794 """ 1795 if debug: 1796 print('check', f'debug={debug}') 1797 if now is None: 1798 now = self.time() 1799 if cycle is None: 1800 cycle = ZakatTracker.TimeCycle() 1801 if unscaled_nisab is None: 1802 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1803 nisab = self.scale(unscaled_nisab) 1804 plan = {} 1805 below_nisab = 0 1806 brief = [0, 0, 0] 1807 valid = False 1808 if debug: 1809 print('exchanges', self.exchanges()) 1810 for x in self._vault['account']: 1811 if not self.zakatable(x): 1812 continue 1813 _box = self._vault['account'][x]['box'] 1814 _log = self._vault['account'][x]['log'] 1815 limit = len(_box) + 1 1816 ids = sorted(self._vault['account'][x]['box'].keys()) 1817 for i in range(-1, -limit, -1): 1818 j = ids[i] 1819 rest = float(_box[j]['rest']) 1820 if rest <= 0: 1821 continue 1822 exchange = self.exchange(x, created=self.time()) 1823 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1824 brief[0] += rest 1825 index = limit + i - 1 1826 epoch = (now - j) / cycle 1827 if debug: 1828 print(f"Epoch: {epoch}", _box[j]) 1829 if _box[j]['last'] > 0: 1830 epoch = (now - _box[j]['last']) / cycle 1831 if debug: 1832 print(f"Epoch: {epoch}") 1833 epoch = floor(epoch) 1834 if debug: 1835 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1836 if epoch == 0: 1837 continue 1838 if debug: 1839 print("Epoch - PASSED") 1840 brief[1] += rest 1841 if rest >= nisab: 1842 total = 0 1843 for _ in range(epoch): 1844 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1845 if total > 0: 1846 if x not in plan: 1847 plan[x] = {} 1848 valid = True 1849 brief[2] += total 1850 plan[x][index] = { 1851 'total': total, 1852 'count': epoch, 1853 'box_time': j, 1854 'box_capital': _box[j]['capital'], 1855 'box_rest': _box[j]['rest'], 1856 'box_last': _box[j]['last'], 1857 'box_total': _box[j]['total'], 1858 'box_count': _box[j]['count'], 1859 'box_log': _log[j]['desc'], 1860 'exchange_rate': exchange['rate'], 1861 'exchange_time': exchange['time'], 1862 'exchange_desc': exchange['description'], 1863 } 1864 else: 1865 chunk = ZakatTracker.ZakatCut(float(rest)) 1866 if chunk > 0: 1867 if x not in plan: 1868 plan[x] = {} 1869 if j not in plan[x].keys(): 1870 plan[x][index] = {} 1871 below_nisab += rest 1872 brief[2] += chunk 1873 plan[x][index]['below_nisab'] = chunk 1874 plan[x][index]['total'] = chunk 1875 plan[x][index]['count'] = epoch 1876 plan[x][index]['box_time'] = j 1877 plan[x][index]['box_capital'] = _box[j]['capital'] 1878 plan[x][index]['box_rest'] = _box[j]['rest'] 1879 plan[x][index]['box_last'] = _box[j]['last'] 1880 plan[x][index]['box_total'] = _box[j]['total'] 1881 plan[x][index]['box_count'] = _box[j]['count'] 1882 plan[x][index]['box_log'] = _log[j]['desc'] 1883 plan[x][index]['exchange_rate'] = exchange['rate'] 1884 plan[x][index]['exchange_time'] = exchange['time'] 1885 plan[x][index]['exchange_desc'] = exchange['description'] 1886 valid = valid or below_nisab >= nisab 1887 if debug: 1888 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1889 return valid, brief, plan 1890 1891 def build_payment_parts(self, scaled_demand: int, positive_only: bool = True) -> dict: 1892 """ 1893 Build payment parts for the Zakat distribution. 1894 1895 Parameters: 1896 scaled_demand (int): The total demand for payment in local currency. 1897 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1898 1899 Returns: 1900 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1901 { 1902 'account': { 1903 'account_id': {'balance': float, 'rate': float, 'part': float}, 1904 ... 1905 }, 1906 'exceed': bool, 1907 'demand': int, 1908 'total': float, 1909 } 1910 """ 1911 total = 0 1912 parts = { 1913 'account': {}, 1914 'exceed': False, 1915 'demand': int(round(scaled_demand)), 1916 } 1917 for x, y in self.accounts().items(): 1918 if positive_only and y <= 0: 1919 continue 1920 total += float(y) 1921 exchange = self.exchange(x) 1922 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1923 parts['total'] = total 1924 return parts 1925 1926 @staticmethod 1927 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1928 """ 1929 Checks the validity of payment parts. 1930 1931 Parameters: 1932 parts (dict): A dictionary containing payment parts information. 1933 debug (bool): Flag to enable debug mode. 1934 1935 Returns: 1936 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1937 1938 Error Codes: 1939 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1940 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1941 3: 'part' value in parts['account'][x] is less than 0. 1942 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1943 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1944 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1945 """ 1946 if debug: 1947 print('check_payment_parts', f'debug={debug}') 1948 for i in ['demand', 'account', 'total', 'exceed']: 1949 if i not in parts: 1950 return 1 1951 exceed = parts['exceed'] 1952 for x in parts['account']: 1953 for j in ['balance', 'rate', 'part']: 1954 if j not in parts['account'][x]: 1955 return 2 1956 if parts['account'][x]['part'] < 0: 1957 return 3 1958 if not exceed and parts['account'][x]['balance'] <= 0: 1959 return 4 1960 demand = parts['demand'] 1961 z = 0 1962 for _, y in parts['account'].items(): 1963 if not exceed and y['part'] > y['balance']: 1964 return 5 1965 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1966 z = round(z, 2) 1967 demand = round(demand, 2) 1968 if debug: 1969 print('check_payment_parts', f'z = {z}, demand = {demand}') 1970 print('check_payment_parts', type(z), type(demand)) 1971 print('check_payment_parts', z != demand) 1972 print('check_payment_parts', str(z) != str(demand)) 1973 if z != demand and str(z) != str(demand): 1974 return 6 1975 return 0 1976 1977 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1978 """ 1979 Perform Zakat calculation based on the given report and optional parts. 1980 1981 Parameters: 1982 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1983 parts (dict): A dictionary containing the payment parts for the zakat. 1984 debug (bool): A flag indicating whether to print debug information. 1985 1986 Returns: 1987 bool: True if the zakat calculation is successful, False otherwise. 1988 """ 1989 if debug: 1990 print('zakat', f'debug={debug}') 1991 valid, _, plan = report 1992 if not valid: 1993 return valid 1994 parts_exist = parts is not None 1995 if parts_exist: 1996 if self.check_payment_parts(parts, debug=debug) != 0: 1997 return False 1998 if debug: 1999 print('######### zakat #######') 2000 print('parts_exist', parts_exist) 2001 no_lock = self.nolock() 2002 self.lock() 2003 report_time = self.time() 2004 self._vault['report'][report_time] = report 2005 self._step(Action.REPORT, ref=report_time) 2006 created = self.time() 2007 for x in plan: 2008 target_exchange = self.exchange(x) 2009 if debug: 2010 print(plan[x]) 2011 print('-------------') 2012 print(self._vault['account'][x]['box']) 2013 ids = sorted(self._vault['account'][x]['box'].keys()) 2014 if debug: 2015 print('plan[x]', plan[x]) 2016 for i in plan[x].keys(): 2017 j = ids[i] 2018 if debug: 2019 print('i', i, 'j', j) 2020 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 2021 key='last', 2022 math_operation=MathOperation.EQUAL) 2023 self._vault['account'][x]['box'][j]['last'] = created 2024 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 2025 self._vault['account'][x]['box'][j]['total'] += amount 2026 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 2027 math_operation=MathOperation.ADDITION) 2028 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 2029 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 2030 math_operation=MathOperation.ADDITION) 2031 if not parts_exist: 2032 try: 2033 self._vault['account'][x]['box'][j]['rest'] -= amount 2034 except TypeError: 2035 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 2036 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 2037 # math_operation=MathOperation.SUBTRACTION) 2038 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 2039 if parts_exist: 2040 for account, part in parts['account'].items(): 2041 if part['part'] == 0: 2042 continue 2043 if debug: 2044 print('zakat-part', account, part['rate']) 2045 target_exchange = self.exchange(account) 2046 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 2047 self.sub( 2048 unscaled_value=self.unscale(int(amount)), 2049 desc='zakat-part-دفعة-زكاة', 2050 account=account, 2051 debug=debug, 2052 ) 2053 if no_lock: 2054 self.free(self.lock()) 2055 return True 2056 2057 def export_json(self, path: str = "data.json") -> bool: 2058 """ 2059 Exports the current state of the ZakatTracker object to a JSON file. 2060 2061 Parameters: 2062 path (str): The path where the JSON file will be saved. Default is "data.json". 2063 2064 Returns: 2065 bool: True if the export is successful, False otherwise. 2066 2067 Raises: 2068 No specific exceptions are raised by this method. 2069 """ 2070 with open(path, "w") as file: 2071 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 2072 return True 2073 2074 def save(self, path: str = None) -> bool: 2075 """ 2076 Saves the ZakatTracker's current state to a camel file. 2077 2078 This method serializes the internal data (`_vault`). 2079 2080 Parameters: 2081 path (str, optional): File path for saving. Defaults to a predefined location. 2082 2083 Returns: 2084 bool: True if the save operation is successful, False otherwise. 2085 """ 2086 if path is None: 2087 path = self.path() 2088 with open(f'{path}.tmp', 'w') as stream: 2089 # first save in tmp file 2090 stream.write(camel.dump(self._vault)) 2091 # then move tmp file to original location 2092 shutil.move(f'{path}.tmp', path) 2093 return True 2094 2095 def load(self, path: str = None) -> bool: 2096 """ 2097 Load the current state of the ZakatTracker object from a camel file. 2098 2099 Parameters: 2100 path (str): The path where the camel file is located. If not provided, it will use the default path. 2101 2102 Returns: 2103 bool: True if the load operation is successful, False otherwise. 2104 """ 2105 if path is None: 2106 path = self.path() 2107 if os.path.exists(path): 2108 with open(path, 'r') as stream: 2109 self._vault = camel.load(stream.read()) 2110 return True 2111 return False 2112 2113 def import_csv_cache_path(self): 2114 """ 2115 Generates the cache file path for imported CSV data. 2116 2117 This function constructs the file path where cached data from CSV imports 2118 will be stored. The cache file is a camel file (.camel extension) appended 2119 to the base path of the object. 2120 2121 Returns: 2122 str: The full path to the import CSV cache file. 2123 2124 Example: 2125 >>> obj = ZakatTracker('/data/reports') 2126 >>> obj.import_csv_cache_path() 2127 '/data/reports.import_csv.camel' 2128 """ 2129 path = str(self.path()) 2130 ext = self.ext() 2131 ext_len = len(ext) 2132 if path.endswith(f'.{ext}'): 2133 path = path[:-ext_len - 1] 2134 _, filename = os.path.split(path + f'.import_csv.{ext}') 2135 return self.base_path(filename) 2136 2137 def import_csv(self, path: str = 'file.csv', scale_decimal_places: int = 0, debug: bool = False) -> tuple: 2138 """ 2139 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 2140 2141 Parameters: 2142 path (str): The path to the CSV file. Default is 'file.csv'. 2143 scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. 2144 debug (bool): A flag indicating whether to print debug information. 2145 2146 Returns: 2147 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 2148 and a dictionary of bad transactions. 2149 2150 Notes: 2151 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 2152 are appropriate for the currency pairs involved in the conversions. 2153 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 2154 to 1.0 or the previous rate for that account. 2155 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 2156 transactions of the same account within the whole imported and existing dataset when doing `check` and 2157 `zakat` operations. 2158 2159 Example Usage: 2160 The CSV file should have the following format, rate is optional per transaction: 2161 account, desc, value, date, rate 2162 For example: 2163 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 2164 """ 2165 if debug: 2166 print('import_csv', f'debug={debug}') 2167 cache: list[int] = [] 2168 try: 2169 with open(self.import_csv_cache_path(), 'r') as stream: 2170 cache = camel.load(stream.read()) 2171 except: 2172 pass 2173 date_formats = [ 2174 "%Y-%m-%d %H:%M:%S", 2175 "%Y-%m-%dT%H:%M:%S", 2176 "%Y-%m-%dT%H%M%S", 2177 "%Y-%m-%d", 2178 ] 2179 created, found, bad = 0, 0, {} 2180 data: dict[int, list] = {} 2181 with open(path, newline='', encoding="utf-8") as f: 2182 i = 0 2183 for row in csv.reader(f, delimiter=','): 2184 i += 1 2185 hashed = hash(tuple(row)) 2186 if hashed in cache: 2187 found += 1 2188 continue 2189 account = row[0] 2190 desc = row[1] 2191 value = float(row[2]) 2192 rate = 1.0 2193 if row[4:5]: # Empty list if index is out of range 2194 rate = float(row[4]) 2195 date: int = 0 2196 for time_format in date_formats: 2197 try: 2198 date = self.time(datetime.datetime.strptime(row[3], time_format)) 2199 break 2200 except: 2201 pass 2202 # TODO: not allowed for negative dates in the future after enhance time functions 2203 if date == 0: 2204 bad[i] = row + ['invalid date'] 2205 if value == 0: 2206 bad[i] = row + ['invalid value'] 2207 continue 2208 if date not in data: 2209 data[date] = [] 2210 data[date].append((i, account, desc, value, date, rate, hashed)) 2211 2212 if debug: 2213 print('import_csv', len(data)) 2214 2215 if bad: 2216 return created, found, bad 2217 2218 for date, rows in sorted(data.items()): 2219 try: 2220 len_rows = len(rows) 2221 if len_rows == 1: 2222 (_, account, desc, unscaled_value, date, rate, hashed) = rows[0] 2223 value = self.unscale( 2224 unscaled_value, 2225 decimal_places=scale_decimal_places, 2226 ) if scale_decimal_places > 0 else unscaled_value 2227 if rate > 0: 2228 self.exchange(account=account, created=date, rate=rate) 2229 if value > 0: 2230 self.track(unscaled_value=value, desc=desc, account=account, logging=True, created=date) 2231 elif value < 0: 2232 self.sub(unscaled_value=-value, desc=desc, account=account, created=date) 2233 created += 1 2234 cache.append(hashed) 2235 continue 2236 if debug: 2237 print('-- Duplicated time detected', date, 'len', len_rows) 2238 print(rows) 2239 print('---------------------------------') 2240 # If records are found at the same time with different accounts in the same amount 2241 # (one positive and the other negative), this indicates it is a transfer. 2242 if len_rows != 2: 2243 raise Exception(f'more than two transactions({len_rows}) at the same time') 2244 (i, account1, desc1, unscaled_value1, date1, rate1, _) = rows[0] 2245 (j, account2, desc2, unscaled_value2, date2, rate2, _) = rows[1] 2246 if account1 == account2 or desc1 != desc2 or abs(unscaled_value1) != abs( 2247 unscaled_value2) or date1 != date2: 2248 raise Exception('invalid transfer') 2249 if rate1 > 0: 2250 self.exchange(account1, created=date1, rate=rate1) 2251 if rate2 > 0: 2252 self.exchange(account2, created=date2, rate=rate2) 2253 value1 = self.unscale( 2254 unscaled_value1, 2255 decimal_places=scale_decimal_places, 2256 ) if scale_decimal_places > 0 else unscaled_value1 2257 value2 = self.unscale( 2258 unscaled_value2, 2259 decimal_places=scale_decimal_places, 2260 ) if scale_decimal_places > 0 else unscaled_value2 2261 values = { 2262 value1: account1, 2263 value2: account2, 2264 } 2265 self.transfer( 2266 unscaled_amount=abs(value1), 2267 from_account=values[min(values.keys())], 2268 to_account=values[max(values.keys())], 2269 desc=desc1, 2270 created=date1, 2271 ) 2272 except Exception as e: 2273 for (i, account, desc, value, date, rate, _) in rows: 2274 bad[i] = (account, desc, value, date, rate, e) 2275 break 2276 with open(self.import_csv_cache_path(), 'w') as stream: 2277 stream.write(camel.dump(cache)) 2278 return created, found, bad 2279 2280 ######## 2281 # TESTS # 2282 ####### 2283 2284 @staticmethod 2285 def human_readable_size(size: float, decimal_places: int = 2) -> str: 2286 """ 2287 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 2288 2289 This function iterates through progressively larger units of information 2290 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2291 range that can be expressed with a reasonable number before the unit. 2292 2293 Parameters: 2294 size (float): The size in bytes to convert. 2295 decimal_places (int, optional): The number of decimal places to display 2296 in the result. Defaults to 2. 2297 2298 Returns: 2299 str: A string representation of the size in a human-readable format, 2300 rounded to the specified number of decimal places. For example: 2301 - "1.50 KB" (1536 bytes) 2302 - "23.00 MB" (24117248 bytes) 2303 - "1.23 GB" (1325899906 bytes) 2304 """ 2305 if type(size) not in (float, int): 2306 raise TypeError("size must be a float or integer") 2307 if type(decimal_places) != int: 2308 raise TypeError("decimal_places must be an integer") 2309 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2310 if size < 1024.0: 2311 break 2312 size /= 1024.0 2313 return f"{size:.{decimal_places}f} {unit}" 2314 2315 @staticmethod 2316 def get_dict_size(obj: dict, seen: set = None) -> float: 2317 """ 2318 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2319 2320 This function traverses the dictionary structure, accounting for the size of keys, values, 2321 and any nested objects. It handles various data types commonly found in dictionaries 2322 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2323 of circular references. 2324 2325 Parameters: 2326 obj (dict): The dictionary whose size is to be calculated. 2327 seen (set, optional): A set used internally to track visited objects 2328 and avoid circular references. Defaults to None. 2329 2330 Returns: 2331 float: An approximate size of the dictionary and its contents in bytes. 2332 2333 Note: 2334 - This function is a method of the `ZakatTracker` class and is likely used to 2335 estimate the memory footprint of data structures relevant to Zakat calculations. 2336 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2337 not account for all memory overhead depending on the Python implementation. 2338 - Circular references are handled to prevent infinite recursion. 2339 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2340 - String sizes are estimated based on character length and encoding. 2341 """ 2342 size = 0 2343 if seen is None: 2344 seen = set() 2345 2346 obj_id = id(obj) 2347 if obj_id in seen: 2348 return 0 2349 2350 seen.add(obj_id) 2351 size += sys.getsizeof(obj) 2352 2353 if isinstance(obj, dict): 2354 for k, v in obj.items(): 2355 size += ZakatTracker.get_dict_size(k, seen) 2356 size += ZakatTracker.get_dict_size(v, seen) 2357 elif isinstance(obj, (list, tuple, set, frozenset)): 2358 for item in obj: 2359 size += ZakatTracker.get_dict_size(item, seen) 2360 elif isinstance(obj, (int, float, complex)): # Handle numbers 2361 pass # Basic numbers have a fixed size, so nothing to add here 2362 elif isinstance(obj, str): # Handle strings 2363 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2364 return size 2365 2366 @staticmethod 2367 def duration_from_nanoseconds(ns: int, 2368 show_zeros_in_spoken_time: bool = False, 2369 spoken_time_separator=',', 2370 millennia: str = 'Millennia', 2371 century: str = 'Century', 2372 years: str = 'Years', 2373 days: str = 'Days', 2374 hours: str = 'Hours', 2375 minutes: str = 'Minutes', 2376 seconds: str = 'Seconds', 2377 milli_seconds: str = 'MilliSeconds', 2378 micro_seconds: str = 'MicroSeconds', 2379 nano_seconds: str = 'NanoSeconds', 2380 ) -> tuple: 2381 """ 2382 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2383 Convert NanoSeconds to Human Readable Time Format. 2384 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2385 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2386 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2387 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2388 2389 INPUT : ms (AKA: MilliSeconds) 2390 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2391 OUTPUT Variables: time_lapsed, spoken_time 2392 2393 Example Input: duration_from_nanoseconds(ns) 2394 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2395 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2396 duration_from_nanoseconds(1234567890123456789012) 2397 """ 2398 us, ns = divmod(ns, 1000) 2399 ms, us = divmod(us, 1000) 2400 s, ms = divmod(ms, 1000) 2401 m, s = divmod(s, 60) 2402 h, m = divmod(m, 60) 2403 d, h = divmod(h, 24) 2404 y, d = divmod(d, 365) 2405 c, y = divmod(y, 100) 2406 n, c = divmod(c, 10) 2407 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2408 spoken_time_part = [] 2409 if n > 0 or show_zeros_in_spoken_time: 2410 spoken_time_part.append(f"{n: 3d} {millennia}") 2411 if c > 0 or show_zeros_in_spoken_time: 2412 spoken_time_part.append(f"{c: 4d} {century}") 2413 if y > 0 or show_zeros_in_spoken_time: 2414 spoken_time_part.append(f"{y: 3d} {years}") 2415 if d > 0 or show_zeros_in_spoken_time: 2416 spoken_time_part.append(f"{d: 4d} {days}") 2417 if h > 0 or show_zeros_in_spoken_time: 2418 spoken_time_part.append(f"{h: 2d} {hours}") 2419 if m > 0 or show_zeros_in_spoken_time: 2420 spoken_time_part.append(f"{m: 2d} {minutes}") 2421 if s > 0 or show_zeros_in_spoken_time: 2422 spoken_time_part.append(f"{s: 2d} {seconds}") 2423 if ms > 0 or show_zeros_in_spoken_time: 2424 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2425 if us > 0 or show_zeros_in_spoken_time: 2426 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2427 if ns > 0 or show_zeros_in_spoken_time: 2428 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2429 return time_lapsed, spoken_time_separator.join(spoken_time_part) 2430 2431 @staticmethod 2432 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2433 """ 2434 Convert a specific day, month, and year into a timestamp. 2435 2436 Parameters: 2437 day (int): The day of the month. 2438 month (int): The month of the year. Default is 6 (June). 2439 year (int): The year. Default is 2024. 2440 2441 Returns: 2442 int: The timestamp representing the given day, month, and year. 2443 2444 Note: 2445 This method assumes the default month and year if not provided. 2446 """ 2447 return ZakatTracker.time(datetime.datetime(year, month, day)) 2448 2449 @staticmethod 2450 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2451 """ 2452 Generate a random date between two given dates. 2453 2454 Parameters: 2455 start_date (datetime.datetime): The start date from which to generate a random date. 2456 end_date (datetime.datetime): The end date until which to generate a random date. 2457 2458 Returns: 2459 datetime.datetime: A random date between the start_date and end_date. 2460 """ 2461 time_between_dates = end_date - start_date 2462 days_between_dates = time_between_dates.days 2463 random_number_of_days = random.randrange(days_between_dates) 2464 return start_date + datetime.timedelta(days=random_number_of_days) 2465 2466 @staticmethod 2467 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2468 debug: bool = False) -> int: 2469 """ 2470 Generate a random CSV file with specified parameters. 2471 2472 Parameters: 2473 path (str): The path where the CSV file will be saved. Default is "data.csv". 2474 count (int): The number of rows to generate in the CSV file. Default is 1000. 2475 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2476 debug (bool): A flag indicating whether to print debug information. 2477 2478 Returns: 2479 None. The function generates a CSV file at the specified path with the given count of rows. 2480 Each row contains a randomly generated account, description, value, and date. 2481 The value is randomly generated between 1000 and 100000, 2482 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2483 If the row number is not divisible by 13, the value is multiplied by -1. 2484 """ 2485 if debug: 2486 print('generate_random_csv_file', f'debug={debug}') 2487 i = 0 2488 with open(path, "w", newline="") as csvfile: 2489 writer = csv.writer(csvfile) 2490 for i in range(count): 2491 account = f"acc-{random.randint(1, 1000)}" 2492 desc = f"Some text {random.randint(1, 1000)}" 2493 value = random.randint(1000, 100000) 2494 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2495 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2496 if not i % 13 == 0: 2497 value *= -1 2498 row = [account, desc, value, date] 2499 if with_rate: 2500 rate = random.randint(1, 100) * 0.12 2501 if debug: 2502 print('before-append', row) 2503 row.append(rate) 2504 if debug: 2505 print('after-append', row) 2506 writer.writerow(row) 2507 i = i + 1 2508 return i 2509 2510 @staticmethod 2511 def create_random_list(max_sum, min_value=0, max_value=10): 2512 """ 2513 Creates a list of random integers whose sum does not exceed the specified maximum. 2514 2515 Args: 2516 max_sum: The maximum allowed sum of the list elements. 2517 min_value: The minimum possible value for an element (inclusive). 2518 max_value: The maximum possible value for an element (inclusive). 2519 2520 Returns: 2521 A list of random integers. 2522 """ 2523 result = [] 2524 current_sum = 0 2525 2526 while current_sum < max_sum: 2527 # Calculate the remaining space for the next element 2528 remaining_sum = max_sum - current_sum 2529 # Determine the maximum possible value for the next element 2530 next_max_value = min(remaining_sum, max_value) 2531 # Generate a random element within the allowed range 2532 next_element = random.randint(min_value, next_max_value) 2533 result.append(next_element) 2534 current_sum += next_element 2535 2536 return result 2537 2538 def _test_core(self, restore=False, debug=False): 2539 2540 if debug: 2541 random.seed(1234567890) 2542 2543 test_cases = [ 2544 datetime.datetime(1, 1, 1), 2545 datetime.datetime(1970, 1, 1), 2546 datetime.datetime(1969, 12, 31), 2547 datetime.datetime.now(), 2548 datetime.datetime(9999, 12, 31, 23, 59, 59), 2549 ] 2550 2551 for test_date in test_cases: 2552 timestamp = ZakatTracker.time(test_date) 2553 converted = ZakatTracker.time_to_datetime(timestamp) 2554 if debug: 2555 print(f"{timestamp} <=> {converted}") 2556 assert timestamp > 0 2557 assert test_date.year == converted.year 2558 assert test_date.month == converted.month 2559 assert test_date.day == converted.day 2560 assert test_date.hour == converted.hour 2561 assert test_date.minute == converted.minute 2562 assert test_date.second in [converted.second - 1, converted.second, converted.second + 1] 2563 2564 # sanity check - random forward time 2565 2566 xlist = [] 2567 limit = 1000 2568 for _ in range(limit): 2569 y = ZakatTracker.time() 2570 z = '-' 2571 if y not in xlist: 2572 xlist.append(y) 2573 else: 2574 z = 'x' 2575 if debug: 2576 print(z, y) 2577 xx = len(xlist) 2578 if debug: 2579 print('count', xx, ' - unique: ', (xx / limit) * 100, '%') 2580 assert limit == xx 2581 2582 # sanity check - convert date since 1000AD 2583 2584 for year in range(1000, 9000): 2585 ns = ZakatTracker.time(datetime.datetime.strptime(f"{year}-12-30 18:30:45", "%Y-%m-%d %H:%M:%S")) 2586 date = ZakatTracker.time_to_datetime(ns) 2587 if debug: 2588 print(date) 2589 assert date.year == year 2590 assert date.month == 12 2591 assert date.day == 30 2592 assert date.hour == 18 2593 assert date.minute == 30 2594 assert date.second in [44, 45] 2595 2596 # human_readable_size 2597 2598 assert ZakatTracker.human_readable_size(0) == "0.00 B" 2599 assert ZakatTracker.human_readable_size(512) == "512.00 B" 2600 assert ZakatTracker.human_readable_size(1023) == "1023.00 B" 2601 2602 assert ZakatTracker.human_readable_size(1024) == "1.00 KB" 2603 assert ZakatTracker.human_readable_size(2048) == "2.00 KB" 2604 assert ZakatTracker.human_readable_size(5120) == "5.00 KB" 2605 2606 assert ZakatTracker.human_readable_size(1024 ** 2) == "1.00 MB" 2607 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2) == "2.50 MB" 2608 2609 assert ZakatTracker.human_readable_size(1024 ** 3) == "1.00 GB" 2610 assert ZakatTracker.human_readable_size(1024 ** 4) == "1.00 TB" 2611 assert ZakatTracker.human_readable_size(1024 ** 5) == "1.00 PB" 2612 2613 assert ZakatTracker.human_readable_size(1536, decimal_places=0) == "2 KB" 2614 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2, decimal_places=1) == "2.5 MB" 2615 assert ZakatTracker.human_readable_size(1234567890, decimal_places=3) == "1.150 GB" 2616 2617 try: 2618 # noinspection PyTypeChecker 2619 ZakatTracker.human_readable_size("not a number") 2620 assert False, "Expected TypeError for invalid input" 2621 except TypeError: 2622 pass 2623 2624 try: 2625 # noinspection PyTypeChecker 2626 ZakatTracker.human_readable_size(1024, decimal_places="not an int") 2627 assert False, "Expected TypeError for invalid decimal_places" 2628 except TypeError: 2629 pass 2630 2631 # get_dict_size 2632 assert ZakatTracker.get_dict_size({}) == sys.getsizeof({}), "Empty dictionary size mismatch" 2633 assert ZakatTracker.get_dict_size({"a": 1, "b": 2.5, "c": True}) != sys.getsizeof({}), "Not Empty dictionary" 2634 2635 # number scale 2636 error = 0 2637 total = 0 2638 for sign in ['', '-']: 2639 for max_i, max_j, decimal_places in [ 2640 (101, 101, 2), # fiat currency minimum unit took 2 decimal places 2641 (1, 1_000, 8), # cryptocurrency like Satoshi in Bitcoin took 8 decimal places 2642 (1, 1_000, 18) # cryptocurrency like Wei in Ethereum took 18 decimal places 2643 ]: 2644 for return_type in ( 2645 float, 2646 Decimal, 2647 ): 2648 for i in range(max_i): 2649 for j in range(max_j): 2650 total += 1 2651 num_str = f'{sign}{i}.{j:0{decimal_places}d}' 2652 num = return_type(num_str) 2653 scaled = self.scale(num, decimal_places=decimal_places) 2654 unscaled = self.unscale(scaled, return_type=return_type, decimal_places=decimal_places) 2655 if debug: 2656 print( 2657 f'return_type: {return_type}, num_str: {num_str} - num: {num} - scaled: {scaled} - unscaled: {unscaled}') 2658 if unscaled != num: 2659 if debug: 2660 print('***** SCALE ERROR *****') 2661 error += 1 2662 if debug: 2663 print(f'total: {total}, error({error}): {100 * error / total}%') 2664 assert error == 0 2665 2666 assert self.nolock() 2667 assert self._history() is True 2668 2669 table = { 2670 1: [ 2671 (0, 10, 1000, 1000, 1000, 1, 1), 2672 (0, 20, 3000, 3000, 3000, 2, 2), 2673 (0, 30, 6000, 6000, 6000, 3, 3), 2674 (1, 15, 4500, 4500, 4500, 3, 4), 2675 (1, 50, -500, -500, -500, 4, 5), 2676 (1, 100, -10500, -10500, -10500, 5, 6), 2677 ], 2678 'wallet': [ 2679 (1, 90, -9000, -9000, -9000, 1, 1), 2680 (0, 100, 1000, 1000, 1000, 2, 2), 2681 (1, 190, -18000, -18000, -18000, 3, 3), 2682 (0, 1000, 82000, 82000, 82000, 4, 4), 2683 ], 2684 } 2685 for x in table: 2686 for y in table[x]: 2687 self.lock() 2688 if y[0] == 0: 2689 ref = self.track( 2690 unscaled_value=y[1], 2691 desc='test-add', 2692 account=x, 2693 logging=True, 2694 created=ZakatTracker.time(), 2695 debug=debug, 2696 ) 2697 else: 2698 (ref, z) = self.sub( 2699 unscaled_value=y[1], 2700 desc='test-sub', 2701 account=x, 2702 created=ZakatTracker.time(), 2703 ) 2704 if debug: 2705 print('_sub', z, ZakatTracker.time()) 2706 assert ref != 0 2707 assert len(self._vault['account'][x]['log'][ref]['file']) == 0 2708 for i in range(3): 2709 file_ref = self.add_file(x, ref, 'file_' + str(i)) 2710 sleep(0.0000001) 2711 assert file_ref != 0 2712 if debug: 2713 print('ref', ref, 'file', file_ref) 2714 assert len(self._vault['account'][x]['log'][ref]['file']) == i + 1 2715 file_ref = self.add_file(x, ref, 'file_' + str(3)) 2716 assert self.remove_file(x, ref, file_ref) 2717 daily_logs = self.daily_logs(debug=debug) 2718 if debug: 2719 print('daily_logs', daily_logs) 2720 for k, v in daily_logs.items(): 2721 assert k 2722 assert v 2723 z = self.balance(x) 2724 if debug: 2725 print("debug-0", z, y) 2726 assert z == y[2] 2727 z = self.balance(x, False) 2728 if debug: 2729 print("debug-1", z, y[3]) 2730 assert z == y[3] 2731 o = self._vault['account'][x]['log'] 2732 z = 0 2733 for i in o: 2734 z += o[i]['value'] 2735 if debug: 2736 print("debug-2", z, type(z)) 2737 print("debug-2", y[4], type(y[4])) 2738 assert z == y[4] 2739 if debug: 2740 print('debug-2 - PASSED') 2741 assert self.box_size(x) == y[5] 2742 assert self.log_size(x) == y[6] 2743 assert not self.nolock() 2744 self.free(self.lock()) 2745 assert self.nolock() 2746 assert self.boxes(x) != {} 2747 assert self.logs(x) != {} 2748 2749 assert not self.hide(x) 2750 assert self.hide(x, False) is False 2751 assert self.hide(x) is False 2752 assert self.hide(x, True) 2753 assert self.hide(x) 2754 2755 assert self.zakatable(x) 2756 assert self.zakatable(x, False) is False 2757 assert self.zakatable(x) is False 2758 assert self.zakatable(x, True) 2759 assert self.zakatable(x) 2760 2761 if restore is True: 2762 count = len(self._vault['history']) 2763 if debug: 2764 print('history-count', count) 2765 assert count == 10 2766 # try mode 2767 for _ in range(count): 2768 assert self.recall(True, debug) 2769 count = len(self._vault['history']) 2770 if debug: 2771 print('history-count', count) 2772 assert count == 10 2773 _accounts = list(table.keys()) 2774 accounts_limit = len(_accounts) + 1 2775 for i in range(-1, -accounts_limit, -1): 2776 account = _accounts[i] 2777 if debug: 2778 print(account, len(table[account])) 2779 transaction_limit = len(table[account]) + 1 2780 for j in range(-1, -transaction_limit, -1): 2781 row = table[account][j] 2782 if debug: 2783 print(row, self.balance(account), self.balance(account, False)) 2784 assert self.balance(account) == self.balance(account, False) 2785 assert self.balance(account) == row[2] 2786 assert self.recall(False, debug) 2787 assert self.recall(False, debug) is False 2788 count = len(self._vault['history']) 2789 if debug: 2790 print('history-count', count) 2791 assert count == 0 2792 self.reset() 2793 2794 def test(self, debug: bool = False) -> bool: 2795 if debug: 2796 print('test', f'debug={debug}') 2797 try: 2798 2799 self._test_core(True, debug) 2800 self._test_core(False, debug) 2801 2802 assert self._history() 2803 2804 # Not allowed for duplicate transactions in the same account and time 2805 2806 created = ZakatTracker.time() 2807 self.track(100, 'test-1', 'same', True, created) 2808 failed = False 2809 try: 2810 self.track(50, 'test-1', 'same', True, created) 2811 except: 2812 failed = True 2813 assert failed is True 2814 2815 self.reset() 2816 2817 # Same account transfer 2818 for x in [1, 'a', True, 1.8, None]: 2819 failed = False 2820 try: 2821 self.transfer(1, x, x, 'same-account', debug=debug) 2822 except: 2823 failed = True 2824 assert failed is True 2825 2826 # Always preserve box age during transfer 2827 2828 series: list[tuple] = [ 2829 (30, 4), 2830 (60, 3), 2831 (90, 2), 2832 ] 2833 case = { 2834 3000: { 2835 'series': series, 2836 'rest': 15000, 2837 }, 2838 6000: { 2839 'series': series, 2840 'rest': 12000, 2841 }, 2842 9000: { 2843 'series': series, 2844 'rest': 9000, 2845 }, 2846 18000: { 2847 'series': series, 2848 'rest': 0, 2849 }, 2850 27000: { 2851 'series': series, 2852 'rest': -9000, 2853 }, 2854 36000: { 2855 'series': series, 2856 'rest': -18000, 2857 }, 2858 } 2859 2860 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2861 2862 for total in case: 2863 if debug: 2864 print('--------------------------------------------------------') 2865 print(f'case[{total}]', case[total]) 2866 for x in case[total]['series']: 2867 self.track( 2868 unscaled_value=x[0], 2869 desc=f"test-{x} ages", 2870 account='ages', 2871 logging=True, 2872 created=selected_time * x[1], 2873 ) 2874 2875 unscaled_total = self.unscale(total) 2876 if debug: 2877 print('unscaled_total', unscaled_total) 2878 refs = self.transfer( 2879 unscaled_amount=unscaled_total, 2880 from_account='ages', 2881 to_account='future', 2882 desc='Zakat Movement', 2883 debug=debug, 2884 ) 2885 2886 if debug: 2887 print('refs', refs) 2888 2889 ages_cache_balance = self.balance('ages') 2890 ages_fresh_balance = self.balance('ages', False) 2891 rest = case[total]['rest'] 2892 if debug: 2893 print('source', ages_cache_balance, ages_fresh_balance, rest) 2894 assert ages_cache_balance == rest 2895 assert ages_fresh_balance == rest 2896 2897 future_cache_balance = self.balance('future') 2898 future_fresh_balance = self.balance('future', False) 2899 if debug: 2900 print('target', future_cache_balance, future_fresh_balance, total) 2901 print('refs', refs) 2902 assert future_cache_balance == total 2903 assert future_fresh_balance == total 2904 2905 # TODO: check boxes times for `ages` should equal box times in `future` 2906 for ref in self._vault['account']['ages']['box']: 2907 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2908 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2909 future_capital = 0 2910 if ref in self._vault['account']['future']['box']: 2911 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2912 future_rest = 0 2913 if ref in self._vault['account']['future']['box']: 2914 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2915 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2916 if debug: 2917 print('================================================================') 2918 print('ages', ages_capital, ages_rest) 2919 print('future', future_capital, future_rest) 2920 if ages_rest == 0: 2921 assert ages_capital == future_capital 2922 elif ages_rest < 0: 2923 assert -ages_capital == future_capital 2924 elif ages_rest > 0: 2925 assert ages_capital == ages_rest + future_capital 2926 self.reset() 2927 assert len(self._vault['history']) == 0 2928 2929 assert self._history() 2930 assert self._history(False) is False 2931 assert self._history() is False 2932 assert self._history(True) 2933 assert self._history() 2934 if debug: 2935 print('####################################################################') 2936 2937 transaction = [ 2938 ( 2939 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2940 2000, 2000, 2000, 1, 1, 2941 ), 2942 ( 2943 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2944 75000, 75000, 75000, 1, 1, 2945 ), 2946 ( 2947 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2948 60000, 60000, 60000, 1, 1, 2949 ), 2950 ] 2951 for z in transaction: 2952 self.lock() 2953 x = z[1] 2954 y = z[2] 2955 self.transfer( 2956 unscaled_amount=z[0], 2957 from_account=x, 2958 to_account=y, 2959 desc='test-transfer', 2960 debug=debug, 2961 ) 2962 zz = self.balance(x) 2963 if debug: 2964 print(zz, z) 2965 assert zz == z[3] 2966 xx = self.accounts()[x] 2967 assert xx == z[3] 2968 assert self.balance(x, False) == z[4] 2969 assert xx == z[4] 2970 2971 s = 0 2972 log = self._vault['account'][x]['log'] 2973 for i in log: 2974 s += log[i]['value'] 2975 if debug: 2976 print('s', s, 'z[5]', z[5]) 2977 assert s == z[5] 2978 2979 assert self.box_size(x) == z[6] 2980 assert self.log_size(x) == z[7] 2981 2982 yy = self.accounts()[y] 2983 assert self.balance(y) == z[8] 2984 assert yy == z[8] 2985 assert self.balance(y, False) == z[9] 2986 assert yy == z[9] 2987 2988 s = 0 2989 log = self._vault['account'][y]['log'] 2990 for i in log: 2991 s += log[i]['value'] 2992 assert s == z[10] 2993 2994 assert self.box_size(y) == z[11] 2995 assert self.log_size(y) == z[12] 2996 assert self.free(self.lock()) 2997 2998 if debug: 2999 pp().pprint(self.check(2.17)) 3000 3001 assert not self.nolock() 3002 history_count = len(self._vault['history']) 3003 if debug: 3004 print('history-count', history_count) 3005 assert history_count == 4 3006 assert not self.free(ZakatTracker.time()) 3007 assert self.free(self.lock()) 3008 assert self.nolock() 3009 assert len(self._vault['history']) == 3 3010 3011 # storage 3012 3013 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 3014 if os.path.exists(_path): 3015 os.remove(_path) 3016 self.save() 3017 assert os.path.getsize(_path) > 0 3018 self.reset() 3019 assert self.recall(False, debug) is False 3020 self.load() 3021 assert self._vault['account'] is not None 3022 3023 # recall 3024 3025 assert self.nolock() 3026 assert len(self._vault['history']) == 3 3027 assert self.recall(False, debug) is True 3028 assert len(self._vault['history']) == 2 3029 assert self.recall(False, debug) is True 3030 assert len(self._vault['history']) == 1 3031 assert self.recall(False, debug) is True 3032 assert len(self._vault['history']) == 0 3033 assert self.recall(False, debug) is False 3034 assert len(self._vault['history']) == 0 3035 3036 # exchange 3037 3038 self.exchange("cash", 25, 3.75, "2024-06-25") 3039 self.exchange("cash", 22, 3.73, "2024-06-22") 3040 self.exchange("cash", 15, 3.69, "2024-06-15") 3041 self.exchange("cash", 10, 3.66) 3042 3043 for i in range(1, 30): 3044 exchange = self.exchange("cash", i) 3045 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 3046 if debug: 3047 print(i, rate, description, created) 3048 assert created 3049 if i < 10: 3050 assert rate == 1 3051 assert description is None 3052 elif i == 10: 3053 assert rate == 3.66 3054 assert description is None 3055 elif i < 15: 3056 assert rate == 3.66 3057 assert description is None 3058 elif i == 15: 3059 assert rate == 3.69 3060 assert description is not None 3061 elif i < 22: 3062 assert rate == 3.69 3063 assert description is not None 3064 elif i == 22: 3065 assert rate == 3.73 3066 assert description is not None 3067 elif i >= 25: 3068 assert rate == 3.75 3069 assert description is not None 3070 exchange = self.exchange("bank", i) 3071 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 3072 if debug: 3073 print(i, rate, description, created) 3074 assert created 3075 assert rate == 1 3076 assert description is None 3077 3078 assert len(self._vault['exchange']) > 0 3079 assert len(self.exchanges()) > 0 3080 self._vault['exchange'].clear() 3081 assert len(self._vault['exchange']) == 0 3082 assert len(self.exchanges()) == 0 3083 3084 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 3085 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 3086 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 3087 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 3088 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 3089 3090 for i in [x * 0.12 for x in range(-15, 21)]: 3091 if i <= 0: 3092 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 3093 else: 3094 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 3095 3096 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 3097 for i in range(1, 31): 3098 timestamp_ns = ZakatTracker.day_to_time(i) 3099 exchange = self.exchange("cash", timestamp_ns) 3100 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 3101 if debug: 3102 print(i, rate, description, created) 3103 assert created 3104 if i < 10: 3105 assert rate == 1 3106 assert description is None 3107 elif i == 10: 3108 assert rate == 3.66 3109 assert description is None 3110 elif i < 15: 3111 assert rate == 3.66 3112 assert description is None 3113 elif i == 15: 3114 assert rate == 3.69 3115 assert description is not None 3116 elif i < 22: 3117 assert rate == 3.69 3118 assert description is not None 3119 elif i == 22: 3120 assert rate == 3.73 3121 assert description is not None 3122 elif i >= 25: 3123 assert rate == 3.75 3124 assert description is not None 3125 exchange = self.exchange("bank", i) 3126 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 3127 if debug: 3128 print(i, rate, description, created) 3129 assert created 3130 assert rate == 1 3131 assert description is None 3132 3133 # csv 3134 3135 csv_count = 1000 3136 3137 for with_rate, path in { 3138 False: 'test-import_csv-no-exchange', 3139 True: 'test-import_csv-with-exchange', 3140 }.items(): 3141 3142 if debug: 3143 print('test_import_csv', with_rate, path) 3144 3145 csv_path = path + '.csv' 3146 if os.path.exists(csv_path): 3147 os.remove(csv_path) 3148 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 3149 if debug: 3150 print('generate_random_csv_file', c) 3151 assert c == csv_count 3152 assert os.path.getsize(csv_path) > 0 3153 cache_path = self.import_csv_cache_path() 3154 if os.path.exists(cache_path): 3155 os.remove(cache_path) 3156 self.reset() 3157 (created, found, bad) = self.import_csv(csv_path, debug) 3158 bad_count = len(bad) 3159 assert bad_count > 0 3160 if debug: 3161 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 3162 print('bad', bad) 3163 tmp_size = os.path.getsize(cache_path) 3164 assert tmp_size > 0 3165 # TODO: assert created + found + bad_count == csv_count 3166 # TODO: assert created == csv_count 3167 # TODO: assert bad_count == 0 3168 (created_2, found_2, bad_2) = self.import_csv(csv_path) 3169 bad_2_count = len(bad_2) 3170 if debug: 3171 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 3172 print('bad', bad) 3173 assert bad_2_count > 0 3174 # TODO: assert tmp_size == os.path.getsize(cache_path) 3175 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 3176 # TODO: assert created == found_2 3177 # TODO: assert bad_count == bad_2_count 3178 # TODO: assert found_2 == csv_count 3179 # TODO: assert bad_2_count == 0 3180 # TODO: assert created_2 == 0 3181 3182 # payment parts 3183 3184 positive_parts = self.build_payment_parts(100, positive_only=True) 3185 assert self.check_payment_parts(positive_parts) != 0 3186 assert self.check_payment_parts(positive_parts) != 0 3187 all_parts = self.build_payment_parts(300, positive_only=False) 3188 assert self.check_payment_parts(all_parts) != 0 3189 assert self.check_payment_parts(all_parts) != 0 3190 if debug: 3191 pp().pprint(positive_parts) 3192 pp().pprint(all_parts) 3193 # dynamic discount 3194 suite = [] 3195 count = 3 3196 for exceed in [False, True]: 3197 case = [] 3198 for parts in [positive_parts, all_parts]: 3199 part = parts.copy() 3200 demand = part['demand'] 3201 if debug: 3202 print(demand, part['total']) 3203 i = 0 3204 z = demand / count 3205 cp = { 3206 'account': {}, 3207 'demand': demand, 3208 'exceed': exceed, 3209 'total': part['total'], 3210 } 3211 j = '' 3212 for x, y in part['account'].items(): 3213 x_exchange = self.exchange(x) 3214 zz = self.exchange_calc(z, 1, x_exchange['rate']) 3215 if exceed and zz <= demand: 3216 i += 1 3217 y['part'] = zz 3218 if debug: 3219 print(exceed, y) 3220 cp['account'][x] = y 3221 case.append(y) 3222 elif not exceed and y['balance'] >= zz: 3223 i += 1 3224 y['part'] = zz 3225 if debug: 3226 print(exceed, y) 3227 cp['account'][x] = y 3228 case.append(y) 3229 j = x 3230 if i >= count: 3231 break 3232 if len(cp['account'][j]) > 0: 3233 suite.append(cp) 3234 if debug: 3235 print('suite', len(suite)) 3236 # vault = self._vault.copy() 3237 for case in suite: 3238 # self._vault = vault.copy() 3239 if debug: 3240 print('case', case) 3241 result = self.check_payment_parts(case) 3242 if debug: 3243 print('check_payment_parts', result, f'exceed: {exceed}') 3244 assert result == 0 3245 3246 report = self.check(2.17, None, debug) 3247 (valid, brief, plan) = report 3248 if debug: 3249 print('valid', valid) 3250 zakat_result = self.zakat(report, parts=case, debug=debug) 3251 if debug: 3252 print('zakat-result', zakat_result) 3253 assert valid == zakat_result 3254 3255 assert self.save(path + f'.{self.ext()}') 3256 assert self.export_json(path + '.json') 3257 3258 assert self.export_json("1000-transactions-test.json") 3259 assert self.save(f"1000-transactions-test.{self.ext()}") 3260 3261 self.reset() 3262 3263 # test transfer between accounts with different exchange rate 3264 3265 a_SAR = "Bank (SAR)" 3266 b_USD = "Bank (USD)" 3267 c_SAR = "Safe (SAR)" 3268 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 3269 for case in [ 3270 (0, a_SAR, "SAR Gift", 1000, 100000), 3271 (1, a_SAR, 1), 3272 (0, b_USD, "USD Gift", 500, 50000), 3273 (1, b_USD, 1), 3274 (2, b_USD, 3.75), 3275 (1, b_USD, 3.75), 3276 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 3277 (0, c_SAR, "Salary", 750, 75000), 3278 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 3279 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 3280 ]: 3281 if debug: 3282 print('case', case) 3283 match (case[0]): 3284 case 0: # track 3285 _, account, desc, x, balance = case 3286 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 3287 3288 cached_value = self.balance(account, cached=True) 3289 fresh_value = self.balance(account, cached=False) 3290 if debug: 3291 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 3292 assert cached_value == balance 3293 assert fresh_value == balance 3294 case 1: # check-exchange 3295 _, account, expected_rate = case 3296 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 3297 if debug: 3298 print('t-exchange', t_exchange) 3299 assert t_exchange['rate'] == expected_rate 3300 case 2: # do-exchange 3301 _, account, rate = case 3302 self.exchange(account, rate=rate, debug=debug) 3303 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 3304 if debug: 3305 print('b-exchange', b_exchange) 3306 assert b_exchange['rate'] == rate 3307 case 3: # transfer 3308 _, x, a, b, desc, a_balance, b_balance = case 3309 self.transfer(x, a, b, desc, debug=debug) 3310 3311 cached_value = self.balance(a, cached=True) 3312 fresh_value = self.balance(a, cached=False) 3313 if debug: 3314 print( 3315 'account', a, 3316 'cached_value', cached_value, 3317 'fresh_value', fresh_value, 3318 'a_balance', a_balance, 3319 ) 3320 assert cached_value == a_balance 3321 assert fresh_value == a_balance 3322 3323 cached_value = self.balance(b, cached=True) 3324 fresh_value = self.balance(b, cached=False) 3325 if debug: 3326 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3327 assert cached_value == b_balance 3328 assert fresh_value == b_balance 3329 3330 # Transfer all in many chunks randomly from B to A 3331 a_SAR_balance = 137125 3332 b_USD_balance = 50100 3333 b_USD_exchange = self.exchange(b_USD) 3334 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3335 if debug: 3336 print('amounts', amounts) 3337 i = 0 3338 for x in amounts: 3339 if debug: 3340 print(f'{i} - transfer-with-exchange({x})') 3341 self.transfer( 3342 unscaled_amount=self.unscale(x), 3343 from_account=b_USD, 3344 to_account=a_SAR, 3345 desc=f"{x} USD -> SAR", 3346 debug=debug, 3347 ) 3348 3349 b_USD_balance -= x 3350 cached_value = self.balance(b_USD, cached=True) 3351 fresh_value = self.balance(b_USD, cached=False) 3352 if debug: 3353 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3354 b_USD_balance) 3355 assert cached_value == b_USD_balance 3356 assert fresh_value == b_USD_balance 3357 3358 a_SAR_balance += int(x * b_USD_exchange['rate']) 3359 cached_value = self.balance(a_SAR, cached=True) 3360 fresh_value = self.balance(a_SAR, cached=False) 3361 if debug: 3362 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3363 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3364 assert cached_value == a_SAR_balance 3365 assert fresh_value == a_SAR_balance 3366 i += 1 3367 3368 # Transfer all in many chunks randomly from C to A 3369 c_SAR_balance = 37500 3370 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3371 if debug: 3372 print('amounts', amounts) 3373 i = 0 3374 for x in amounts: 3375 if debug: 3376 print(f'{i} - transfer-with-exchange({x})') 3377 self.transfer( 3378 unscaled_amount=self.unscale(x), 3379 from_account=c_SAR, 3380 to_account=a_SAR, 3381 desc=f"{x} SAR -> a_SAR", 3382 debug=debug, 3383 ) 3384 3385 c_SAR_balance -= x 3386 cached_value = self.balance(c_SAR, cached=True) 3387 fresh_value = self.balance(c_SAR, cached=False) 3388 if debug: 3389 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3390 c_SAR_balance) 3391 assert cached_value == c_SAR_balance 3392 assert fresh_value == c_SAR_balance 3393 3394 a_SAR_balance += x 3395 cached_value = self.balance(a_SAR, cached=True) 3396 fresh_value = self.balance(a_SAR, cached=False) 3397 if debug: 3398 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3399 a_SAR_balance) 3400 assert cached_value == a_SAR_balance 3401 assert fresh_value == a_SAR_balance 3402 i += 1 3403 3404 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3405 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3406 3407 # check & zakat with exchange rates for many cycles 3408 3409 for rate, values in { 3410 1: { 3411 'in': [1000, 2000, 10000], 3412 'exchanged': [100000, 200000, 1000000], 3413 'out': [2500, 5000, 73140], 3414 }, 3415 3.75: { 3416 'in': [200, 1000, 5000], 3417 'exchanged': [75000, 375000, 1875000], 3418 'out': [1875, 9375, 137138], 3419 }, 3420 }.items(): 3421 a, b, c = values['in'] 3422 m, n, o = values['exchanged'] 3423 x, y, z = values['out'] 3424 if debug: 3425 print('rate', rate, 'values', values) 3426 for case in [ 3427 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3428 {'safe': {0: {'below_nisab': x}}}, 3429 ], False, m), 3430 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3431 {'safe': {0: {'count': 1, 'total': y}}}, 3432 ], True, n), 3433 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3434 {'cave': {0: {'count': 3, 'total': z}}}, 3435 ], True, o), 3436 ]: 3437 if debug: 3438 print(f"############# check(rate: {rate}) #############") 3439 print('case', case) 3440 self.reset() 3441 self.exchange(account=case[1], created=case[2], rate=rate) 3442 self.track( 3443 unscaled_value=case[0], 3444 desc='test-check', 3445 account=case[1], 3446 logging=True, 3447 created=case[2], 3448 ) 3449 assert self.snapshot() 3450 3451 # assert self.nolock() 3452 # history_size = len(self._vault['history']) 3453 # print('history_size', history_size) 3454 # assert history_size == 2 3455 assert self.lock() 3456 assert not self.nolock() 3457 report = self.check(2.17, None, debug) 3458 (valid, brief, plan) = report 3459 if debug: 3460 print('brief', brief) 3461 assert valid == case[4] 3462 assert case[5] == brief[0] 3463 assert case[5] == brief[1] 3464 3465 if debug: 3466 pp().pprint(plan) 3467 3468 for x in plan: 3469 assert case[1] == x 3470 if 'total' in case[3][0][x][0].keys(): 3471 assert case[3][0][x][0]['total'] == int(brief[2]) 3472 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3473 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3474 else: 3475 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3476 if debug: 3477 pp().pprint(report) 3478 result = self.zakat(report, debug=debug) 3479 if debug: 3480 print('zakat-result', result, case[4]) 3481 assert result == case[4] 3482 report = self.check(2.17, None, debug) 3483 (valid, brief, plan) = report 3484 assert valid is False 3485 3486 history_size = len(self._vault['history']) 3487 if debug: 3488 print('history_size', history_size) 3489 assert history_size == 3 3490 assert not self.nolock() 3491 assert self.recall(False, debug) is False 3492 self.free(self.lock()) 3493 assert self.nolock() 3494 3495 for i in range(3, 0, -1): 3496 history_size = len(self._vault['history']) 3497 if debug: 3498 print('history_size', history_size) 3499 assert history_size == i 3500 assert self.recall(False, debug) is True 3501 3502 assert self.nolock() 3503 assert self.recall(False, debug) is False 3504 3505 history_size = len(self._vault['history']) 3506 if debug: 3507 print('history_size', history_size) 3508 assert history_size == 0 3509 3510 account_size = len(self._vault['account']) 3511 if debug: 3512 print('account_size', account_size) 3513 assert account_size == 0 3514 3515 report_size = len(self._vault['report']) 3516 if debug: 3517 print('report_size', report_size) 3518 assert report_size == 0 3519 3520 assert self.nolock() 3521 return True 3522 except Exception as e: 3523 # pp().pprint(self._vault) 3524 assert self.export_json("test-snapshot.json") 3525 assert self.save(f"test-snapshot.{self.ext()}") 3526 raise e
A class for tracking and calculating Zakat.
This class provides functionalities for recording transactions, calculating Zakat due, and managing account balances. It also offers features like importing transactions from CSV files, exporting data to JSON format, and saving/loading the tracker state.
The ZakatTracker
class is designed to handle both positive and negative transactions,
allowing for flexible tracking of financial activities related to Zakat. It also supports
the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due
based on the current silver price.
The class uses a camel file as its database to persist the tracker state, ensuring data integrity across sessions. It also provides options for enabling or disabling history tracking, allowing users to choose their preferred level of detail.
In addition, the ZakatTracker
class includes various helper methods like
time
, time_to_datetime
, lock
, free
, recall
, export_json
,
and more. These methods provide additional functionalities and flexibility
for interacting with and managing the Zakat tracker.
Attributes: ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. ZakatTracker.Version (function): The version of the ZakatTracker class.
Data Structure: The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data.
_vault (dict):
- account (dict):
- {account_number} (dict):
- balance (int): The current balance of the account.
- box (dict): A dictionary storing transaction details.
- {timestamp} (dict):
- capital (int): The initial amount of the transaction.
- count (int): The number of times Zakat has been calculated for this transaction.
- last (int): The timestamp of the last Zakat calculation.
- rest (int): The remaining amount after Zakat deductions and withdrawal.
- total (int): The total Zakat deducted from this transaction.
- count (int): The total number of transactions for the account.
- log (dict): A dictionary storing transaction logs.
- {timestamp} (dict):
- value (int): The transaction amount (positive or negative).
- desc (str): The description of the transaction.
- ref (int): The box reference (positive or None).
- file (dict): A dictionary storing file references associated with the transaction.
- hide (bool): Indicates whether the account is hidden or not.
- zakatable (bool): Indicates whether the account is subject to Zakat.
- exchange (dict):
- account (dict):
- {timestamps} (dict):
- rate (float): Exchange rate when compared to local currency.
- description (str): The description of the exchange rate.
- history (dict):
- {timestamp} (list): A list of dictionaries storing the history of actions performed.
- {action_dict} (dict):
- action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT).
- account (str): The account number associated with the action.
- ref (int): The reference number of the transaction.
- file (int): The reference number of the file (if applicable).
- key (str): The key associated with the action (e.g., 'rest', 'total').
- value (int): The value associated with the action.
- math (MathOperation): The mathematical operation performed (if applicable).
- lock (int or None): The timestamp indicating the current lock status (None if not locked).
- report (dict):
- {timestamp} (tuple): A tuple storing Zakat report details.
289 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 290 """ 291 Initialize ZakatTracker with database path and history mode. 292 293 Parameters: 294 db_path (str): The path to the database file. Default is "zakat.camel". 295 history_mode (bool): The mode for tracking history. Default is True. 296 297 Returns: 298 None 299 """ 300 self._base_path = None 301 self._vault_path = None 302 self._vault = None 303 self.reset() 304 self._history(history_mode) 305 self.path(db_path)
Initialize ZakatTracker with database path and history mode.
Parameters: db_path (str): The path to the database file. Default is "zakat.camel". history_mode (bool): The mode for tracking history. Default is True.
Returns: None
213 @staticmethod 214 def Version() -> str: 215 """ 216 Returns the current version of the software. 217 218 This function returns a string representing the current version of the software, 219 including major, minor, and patch version numbers in the format "X.Y.Z". 220 221 Returns: 222 str: The current version of the software. 223 """ 224 return '0.2.96'
Returns the current version of the software.
This function returns a string representing the current version of the software, including major, minor, and patch version numbers in the format "X.Y.Z".
Returns: str: The current version of the software.
226 @staticmethod 227 def ZakatCut(x: float) -> float: 228 """ 229 Calculates the Zakat amount due on an asset. 230 231 This function calculates the zakat amount due on a given asset value over one lunar year. 232 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 233 that exceeds a certain threshold (Nisab). 234 235 Parameters: 236 x: The total value of the asset on which Zakat is to be calculated. 237 238 Returns: 239 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 240 """ 241 return 0.025 * x # Zakat Cut in one Lunar Year
Calculates the Zakat amount due on an asset.
This function calculates the zakat amount due on a given asset value over one lunar year. Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth that exceeds a certain threshold (Nisab).
Parameters: x: The total value of the asset on which Zakat is to be calculated.
Returns: The amount of Zakat due on the asset, calculated as 2.5% of the asset's value.
243 @staticmethod 244 def TimeCycle(days: int = 355) -> int: 245 """ 246 Calculates the approximate duration of a lunar year in nanoseconds. 247 248 This function calculates the approximate duration of a lunar year based on the given number of days. 249 It converts the given number of days into nanoseconds for use in high-precision timing applications. 250 251 Parameters: 252 days: The number of days in a lunar year. Defaults to 355, 253 which is an approximation of the average length of a lunar year. 254 255 Returns: 256 The approximate duration of a lunar year in nanoseconds. 257 """ 258 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds
Calculates the approximate duration of a lunar year in nanoseconds.
This function calculates the approximate duration of a lunar year based on the given number of days. It converts the given number of days into nanoseconds for use in high-precision timing applications.
Parameters: days: The number of days in a lunar year. Defaults to 355, which is an approximation of the average length of a lunar year.
Returns: The approximate duration of a lunar year in nanoseconds.
260 @staticmethod 261 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 262 """ 263 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 264 265 This function calculates the Nisab value, which is the minimum threshold of wealth, 266 that makes an individual liable for paying Zakat. 267 The Nisab value is determined by the equivalent value of a specific amount 268 of gold or silver (currently 595 grams in silver) in the local currency. 269 270 Parameters: 271 - gram_price (float): The price per gram of Nisab. 272 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 273 274 Returns: 275 - float: The total value of Nisab based on the given price per gram. 276 """ 277 return gram_price * gram_quantity
Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram.
This function calculates the Nisab value, which is the minimum threshold of wealth, that makes an individual liable for paying Zakat. The Nisab value is determined by the equivalent value of a specific amount of gold or silver (currently 595 grams in silver) in the local currency.
Parameters:
- gram_price (float): The price per gram of Nisab.
- gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver.
Returns:
- float: The total value of Nisab based on the given price per gram.
279 @staticmethod 280 def ext() -> str: 281 """ 282 Returns the file extension used by the ZakatTracker class. 283 284 Returns: 285 str: The file extension used by the ZakatTracker class, which is 'camel'. 286 """ 287 return 'camel'
Returns the file extension used by the ZakatTracker class.
Returns: str: The file extension used by the ZakatTracker class, which is 'camel'.
307 def path(self, path: str = None) -> str: 308 """ 309 Set or get the path to the database file. 310 311 If no path is provided, the current path is returned. 312 If a path is provided, it is set as the new path. 313 The function also creates the necessary directories if the provided path is a file. 314 315 Parameters: 316 path (str): The new path to the database file. If not provided, the current path is returned. 317 318 Returns: 319 str: The current or new path to the database file. 320 """ 321 if path is None: 322 return self._vault_path 323 self._vault_path = Path(path).resolve() 324 base_path = Path(path).resolve() 325 if base_path.is_file() or base_path.suffix: 326 base_path = base_path.parent 327 base_path.mkdir(parents=True, exist_ok=True) 328 self._base_path = base_path 329 return str(self._vault_path)
Set or get the path to the database file.
If no path is provided, the current path is returned. If a path is provided, it is set as the new path. The function also creates the necessary directories if the provided path is a file.
Parameters: path (str): The new path to the database file. If not provided, the current path is returned.
Returns: str: The current or new path to the database file.
331 def base_path(self, *args) -> str: 332 """ 333 Generate a base path by joining the provided arguments with the existing base path. 334 335 Parameters: 336 *args (str): Variable length argument list of strings to be joined with the base path. 337 338 Returns: 339 str: The generated base path. If no arguments are provided, the existing base path is returned. 340 """ 341 if not args: 342 return str(self._base_path) 343 filtered_args = [] 344 ignored_filename = None 345 for arg in args: 346 if Path(arg).suffix: 347 ignored_filename = arg 348 else: 349 filtered_args.append(arg) 350 base_path = Path(self._base_path) 351 full_path = base_path.joinpath(*filtered_args) 352 full_path.mkdir(parents=True, exist_ok=True) 353 if ignored_filename is not None: 354 return full_path.resolve() / ignored_filename # Join with the ignored filename 355 return str(full_path.resolve())
Generate a base path by joining the provided arguments with the existing base path.
Parameters: *args (str): Variable length argument list of strings to be joined with the base path.
Returns: str: The generated base path. If no arguments are provided, the existing base path is returned.
357 @staticmethod 358 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 359 """ 360 Scales a numerical value by a specified power of 10, returning an integer. 361 362 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 363 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 364 365 Parameters: 366 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 367 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 368 by a factor of 100 (e.g., converts 1.23 to 123). 369 370 Returns: 371 The scaled value, rounded to the nearest integer. 372 373 Raises: 374 TypeError: If the input `x` is not a valid numeric type. 375 376 Examples: 377 >>> ZakatTracker.scale(3.14159) 378 314 379 >>> ZakatTracker.scale(1234, decimal_places=3) 380 1234000 381 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 382 50 383 """ 384 if not isinstance(x, (float, int, Decimal)): 385 raise TypeError("Input 'x' must be a float, int, or Decimal.") 386 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places))
Scales a numerical value by a specified power of 10, returning an integer.
This function is designed to handle various numeric types (float
, int
, or Decimal
) and
facilitate precise scaling operations, particularly useful in financial or scientific calculations.
Parameters: x: The numeric value to scale. Can be a floating-point number, integer, or decimal. decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled by a factor of 100 (e.g., converts 1.23 to 123).
Returns: The scaled value, rounded to the nearest integer.
Raises:
TypeError: If the input x
is not a valid numeric type.
Examples:
>>> ZakatTracker.scale(3.14159)
314
>>> ZakatTracker.scale(1234, decimal_places=3)
1234000
>>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4)
50
388 @staticmethod 389 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 390 """ 391 Unscales an integer by a power of 10. 392 393 Parameters: 394 x: The integer to unscale. 395 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 396 decimal_places: The power of 10 to use. Defaults to 2. 397 398 Returns: 399 The unscaled number, converted to the specified return_type. 400 401 Raises: 402 TypeError: If the return_type is not float or Decimal. 403 """ 404 if return_type not in (float, Decimal): 405 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 406 return round(return_type(x / (10 ** decimal_places)), decimal_places)
Unscales an integer by a power of 10.
Parameters: x: The integer to unscale. return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. decimal_places: The power of 10 to use. Defaults to 2.
Returns: The unscaled number, converted to the specified return_type.
Raises: TypeError: If the return_type is not float or Decimal.
422 def reset(self) -> None: 423 """ 424 Reset the internal data structure to its initial state. 425 426 Parameters: 427 None 428 429 Returns: 430 None 431 """ 432 self._vault = { 433 'account': {}, 434 'exchange': {}, 435 'history': {}, 436 'lock': None, 437 'report': {}, 438 }
Reset the internal data structure to its initial state.
Parameters: None
Returns: None
443 @staticmethod 444 def minimum_time_diff_ns() -> tuple[int, int]: 445 """ 446 Calculates the minimum time difference between two consecutive calls to 447 `ZakatTracker._time()` in nanoseconds. 448 449 This method is used internally to determine the minimum granularity of 450 time measurements within the system. 451 452 Returns: 453 tuple[int, int]: 454 - The minimum time difference in nanoseconds. 455 - The number of iterations required to measure the difference. 456 """ 457 i = 0 458 x = y = ZakatTracker._time() 459 while x == y: 460 y = ZakatTracker._time() 461 i += 1 462 return y - x, i
Calculates the minimum time difference between two consecutive calls to
ZakatTracker._time()
in nanoseconds.
This method is used internally to determine the minimum granularity of time measurements within the system.
Returns: tuple[int, int]: - The minimum time difference in nanoseconds. - The number of iterations required to measure the difference.
481 @staticmethod 482 def time(now: datetime.datetime = None) -> int: 483 """ 484 Generates a unique, monotonically increasing timestamp based on the provided 485 datetime object or the current datetime. 486 487 This method ensures that timestamps are unique even if called in rapid succession 488 by introducing a small delay if necessary, based on the system's minimum 489 time resolution. 490 491 Parameters: 492 now (datetime.datetime, optional): The datetime object to generate the timestamp from. 493 If not provided, the current datetime is used. 494 495 Returns: 496 int: The unique timestamp in nanoseconds since the epoch (January 1, 1AD). 497 """ 498 new_time = ZakatTracker._time(now) 499 if ZakatTracker._last_time_ns is None: 500 ZakatTracker._last_time_ns = new_time 501 return new_time 502 while new_time == ZakatTracker._last_time_ns: 503 if ZakatTracker._time_diff_ns is None: 504 diff, _ = ZakatTracker.minimum_time_diff_ns() 505 ZakatTracker._time_diff_ns = ceil(diff) 506 sleep(ZakatTracker._time_diff_ns / 1_000_000_000) 507 new_time = ZakatTracker._time() 508 ZakatTracker._last_time_ns = new_time 509 return new_time
Generates a unique, monotonically increasing timestamp based on the provided datetime object or the current datetime.
This method ensures that timestamps are unique even if called in rapid succession by introducing a small delay if necessary, based on the system's minimum time resolution.
Parameters: now (datetime.datetime, optional): The datetime object to generate the timestamp from. If not provided, the current datetime is used.
Returns: int: The unique timestamp in nanoseconds since the epoch (January 1, 1AD).
511 @staticmethod 512 def time_to_datetime(ordinal_ns: int) -> datetime.datetime: 513 """ 514 Converts a nanosecond-precision timestamp (ordinal number of nanoseconds since 1AD) 515 back to a datetime object. 516 517 Parameters: 518 ordinal_ns (int): The timestamp in nanoseconds since the epoch (January 1, 1AD). 519 520 Returns: 521 datetime.datetime: The corresponding datetime object. 522 """ 523 d = datetime.datetime.fromordinal(ordinal_ns // 86_400_000_000_000) 524 t = datetime.timedelta(seconds=(ordinal_ns % 86_400_000_000_000) // 10 ** 9) 525 return datetime.datetime.combine(d, datetime.time()) + t
Converts a nanosecond-precision timestamp (ordinal number of nanoseconds since 1AD) back to a datetime object.
Parameters: ordinal_ns (int): The timestamp in nanoseconds since the epoch (January 1, 1AD).
Returns: datetime.datetime: The corresponding datetime object.
527 def clean_history(self, lock: int | None = None) -> int: 528 """ 529 Cleans up the history of actions performed on the ZakatTracker instance. 530 531 Parameters: 532 lock (int, optional): The lock ID is used to clean up the empty history. 533 If not provided, it cleans up the empty history records for all locks. 534 535 Returns: 536 int: The number of locks cleaned up. 537 """ 538 count = 0 539 if lock in self._vault['history']: 540 if len(self._vault['history'][lock]) <= 0: 541 count += 1 542 del self._vault['history'][lock] 543 return count 544 self.free(self.lock()) 545 for lock in self._vault['history']: 546 if len(self._vault['history'][lock]) <= 0: 547 count += 1 548 del self._vault['history'][lock] 549 return count
Cleans up the history of actions performed on the ZakatTracker instance.
Parameters: lock (int, optional): The lock ID is used to clean up the empty history. If not provided, it cleans up the empty history records for all locks.
Returns: int: The number of locks cleaned up.
587 def nolock(self) -> bool: 588 """ 589 Check if the vault lock is currently not set. 590 591 Returns: 592 bool: True if the vault lock is not set, False otherwise. 593 """ 594 return self._vault['lock'] is None
Check if the vault lock is currently not set.
Returns: bool: True if the vault lock is not set, False otherwise.
596 def lock(self) -> int: 597 """ 598 Acquires a lock on the ZakatTracker instance. 599 600 Returns: 601 int: The lock ID. This ID can be used to release the lock later. 602 """ 603 return self._step()
Acquires a lock on the ZakatTracker instance.
Returns: int: The lock ID. This ID can be used to release the lock later.
605 def vault(self) -> dict: 606 """ 607 Returns a copy of the internal vault dictionary. 608 609 This method is used to retrieve the current state of the ZakatTracker object. 610 It provides a snapshot of the internal data structure, allowing for further 611 processing or analysis. 612 613 Returns: 614 dict: A copy of the internal vault dictionary. 615 """ 616 return self._vault.copy()
Returns a copy of the internal vault dictionary.
This method is used to retrieve the current state of the ZakatTracker object. It provides a snapshot of the internal data structure, allowing for further processing or analysis.
Returns: dict: A copy of the internal vault dictionary.
618 def stats_init(self) -> dict[str, tuple[int, str]]: 619 """ 620 Initialize and return a dictionary containing initial statistics for the ZakatTracker instance. 621 622 The dictionary contains two keys: 'database' and 'ram'. Each key maps to a tuple containing two elements: 623 - The initial size of the respective statistic in bytes (int). 624 - The initial size of the respective statistic in a human-readable format (str). 625 626 Returns: 627 dict[str, tuple]: A dictionary with initial statistics for the ZakatTracker instance. 628 """ 629 return { 630 'database': (0, '0'), 631 'ram': (0, '0'), 632 }
Initialize and return a dictionary containing initial statistics for the ZakatTracker instance.
The dictionary contains two keys: 'database' and 'ram'. Each key maps to a tuple containing two elements:
- The initial size of the respective statistic in bytes (int).
- The initial size of the respective statistic in a human-readable format (str).
Returns: dict[str, tuple]: A dictionary with initial statistics for the ZakatTracker instance.
634 def stats(self, ignore_ram: bool = True) -> dict[str, tuple[int, str]]: 635 """ 636 Calculates and returns statistics about the object's data storage. 637 638 This method determines the size of the database file on disk and the 639 size of the data currently held in RAM (likely within a dictionary). 640 Both sizes are reported in bytes and in a human-readable format 641 (e.g., KB, MB). 642 643 Parameters: 644 ignore_ram (bool): Whether to ignore the RAM size. Default is True 645 646 Returns: 647 dict[str, tuple]: A dictionary containing the following statistics: 648 649 * 'database': A tuple with two elements: 650 - The database file size in bytes (int). 651 - The database file size in human-readable format (str). 652 * 'ram': A tuple with two elements: 653 - The RAM usage (dictionary size) in bytes (int). 654 - The RAM usage in human-readable format (str). 655 656 Example: 657 >>> stats = my_object.stats() 658 >>> print(stats['database']) 659 (256000, '250.0 KB') 660 >>> print(stats['ram']) 661 (12345, '12.1 KB') 662 """ 663 ram_size = 0.0 if ignore_ram else self.get_dict_size(self.vault()) 664 file_size = os.path.getsize(self.path()) 665 return { 666 'database': (file_size, self.human_readable_size(file_size)), 667 'ram': (ram_size, self.human_readable_size(ram_size)), 668 }
Calculates and returns statistics about the object's data storage.
This method determines the size of the database file on disk and the size of the data currently held in RAM (likely within a dictionary). Both sizes are reported in bytes and in a human-readable format (e.g., KB, MB).
Parameters: ignore_ram (bool): Whether to ignore the RAM size. Default is True
Returns: dict[str, tuple]: A dictionary containing the following statistics:
* 'database': A tuple with two elements:
- The database file size in bytes (int).
- The database file size in human-readable format (str).
* 'ram': A tuple with two elements:
- The RAM usage (dictionary size) in bytes (int).
- The RAM usage in human-readable format (str).
Example:
>>> stats = my_object.stats()
>>> print(stats['database'])
(256000, '250.0 KB')
>>> print(stats['ram'])
(12345, '12.1 KB')
670 def files(self) -> list[dict[str, str | int]]: 671 """ 672 Retrieves information about files associated with this class. 673 674 This class method provides a standardized way to gather details about 675 files used by the class for storage, snapshots, and CSV imports. 676 677 Returns: 678 list[dict[str, str | int]]: A list of dictionaries, each containing information 679 about a specific file: 680 681 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 682 * path (str): The full file path. 683 * exists (bool): Whether the file exists on the filesystem. 684 * size (int): The file size in bytes (0 if the file doesn't exist). 685 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 686 687 Example: 688 ``` 689 file_info = MyClass.files() 690 for info in file_info: 691 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 692 ``` 693 """ 694 result = [] 695 for file_type, path in { 696 'database': self.path(), 697 'snapshot': self.snapshot_cache_path(), 698 'import_csv': self.import_csv_cache_path(), 699 }.items(): 700 exists = os.path.exists(path) 701 size = os.path.getsize(path) if exists else 0 702 human_readable_size = self.human_readable_size(size) if exists else 0 703 result.append({ 704 'type': file_type, 705 'path': path, 706 'exists': exists, 707 'size': size, 708 'human_readable_size': human_readable_size, 709 }) 710 return result
Retrieves information about files associated with this class.
This class method provides a standardized way to gather details about files used by the class for storage, snapshots, and CSV imports.
Returns: list[dict[str, str | int]]: A list of dictionaries, each containing information about a specific file:
* type (str): The type of file ('database', 'snapshot', 'import_csv').
* path (str): The full file path.
* exists (bool): Whether the file exists on the filesystem.
* size (int): The file size in bytes (0 if the file doesn't exist).
* human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB').
Example:
file_info = MyClass.files()
for info in file_info:
print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}")
712 def steps(self) -> dict: 713 """ 714 Returns a copy of the history of steps taken in the ZakatTracker. 715 716 The history is a dictionary where each key is a unique identifier for a step, 717 and the corresponding value is a dictionary containing information about the step. 718 719 Returns: 720 dict: A copy of the history of steps taken in the ZakatTracker. 721 """ 722 return self._vault['history'].copy()
Returns a copy of the history of steps taken in the ZakatTracker.
The history is a dictionary where each key is a unique identifier for a step, and the corresponding value is a dictionary containing information about the step.
Returns: dict: A copy of the history of steps taken in the ZakatTracker.
724 def free(self, lock: int, auto_save: bool = True) -> bool: 725 """ 726 Releases the lock on the database. 727 728 Parameters: 729 lock (int): The lock ID to be released. 730 auto_save (bool): Whether to automatically save the database after releasing the lock. 731 732 Returns: 733 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 734 """ 735 if lock == self._vault['lock']: 736 self._vault['lock'] = None 737 self.clean_history(lock) 738 if auto_save: 739 return self.save(self.path()) 740 return True 741 return False
Releases the lock on the database.
Parameters: lock (int): The lock ID to be released. auto_save (bool): Whether to automatically save the database after releasing the lock.
Returns: bool: True if the lock is successfully released and (optionally) saved, False otherwise.
743 def account_exists(self, account) -> bool: 744 """ 745 Check if the given account exists in the vault. 746 747 Parameters: 748 account (str): The account number to check. 749 750 Returns: 751 bool: True if the account exists, False otherwise. 752 """ 753 return account in self._vault['account']
Check if the given account exists in the vault.
Parameters: account (str): The account number to check.
Returns: bool: True if the account exists, False otherwise.
755 def box_size(self, account) -> int: 756 """ 757 Calculate the size of the box for a specific account. 758 759 Parameters: 760 account (str): The account number for which the box size needs to be calculated. 761 762 Returns: 763 int: The size of the box for the given account. If the account does not exist, -1 is returned. 764 """ 765 if self.account_exists(account): 766 return len(self._vault['account'][account]['box']) 767 return -1
Calculate the size of the box for a specific account.
Parameters: account (str): The account number for which the box size needs to be calculated.
Returns: int: The size of the box for the given account. If the account does not exist, -1 is returned.
769 def log_size(self, account) -> int: 770 """ 771 Get the size of the log for a specific account. 772 773 Parameters: 774 account (str): The account number for which the log size needs to be calculated. 775 776 Returns: 777 int: The size of the log for the given account. If the account does not exist, -1 is returned. 778 """ 779 if self.account_exists(account): 780 return len(self._vault['account'][account]['log']) 781 return -1
Get the size of the log for a specific account.
Parameters: account (str): The account number for which the log size needs to be calculated.
Returns: int: The size of the log for the given account. If the account does not exist, -1 is returned.
783 @staticmethod 784 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 785 """ 786 Calculates the hash of a file using the specified algorithm. 787 788 Parameters: 789 file_path (str): The path to the file. 790 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 791 792 Returns: 793 str: The hexadecimal representation of the file's hash. 794 """ 795 hash_obj = hashlib.new(algorithm) # Create the hash object 796 with open(file_path, "rb") as f: # Open file in binary mode for reading 797 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 798 hash_obj.update(chunk) 799 return hash_obj.hexdigest() # Return the hash as a hexadecimal string
Calculates the hash of a file using the specified algorithm.
Parameters: file_path (str): The path to the file. algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b".
Returns: str: The hexadecimal representation of the file's hash.
801 def snapshot_cache_path(self): 802 """ 803 Generate the path for the cache file used to store snapshots. 804 805 The cache file is a camel file that stores the timestamps of the snapshots. 806 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 807 808 Returns: 809 str: The path to the cache file. 810 """ 811 path = str(self.path()) 812 ext = self.ext() 813 ext_len = len(ext) 814 if path.endswith(f'.{ext}'): 815 path = path[:-ext_len - 1] 816 _, filename = os.path.split(path + f'.snapshots.{ext}') 817 return self.base_path(filename)
Generate the path for the cache file used to store snapshots.
The cache file is a camel file that stores the timestamps of the snapshots. The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel".
Returns: str: The path to the cache file.
819 def snapshot(self) -> bool: 820 """ 821 This function creates a snapshot of the current database state. 822 823 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 824 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 825 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 826 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 827 828 Parameters: 829 None 830 831 Returns: 832 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 833 """ 834 current_hash = self.file_hash(self.path()) 835 cache: dict[str, int] = {} # hash: time_ns 836 try: 837 with open(self.snapshot_cache_path(), 'r') as stream: 838 cache = camel.load(stream.read()) 839 except: 840 pass 841 if current_hash in cache: 842 return True 843 time = time_ns() 844 cache[current_hash] = time 845 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 846 return False 847 with open(self.snapshot_cache_path(), 'w') as stream: 848 stream.write(camel.dump(cache)) 849 return True
This function creates a snapshot of the current database state.
The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. If a snapshot with the same hash exists, the function returns True without creating a new snapshot. If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp.
Parameters: None
Returns: bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails.
851 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 852 -> dict[int, tuple[str, str, bool]]: 853 """ 854 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 855 856 Parameters: 857 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 858 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 859 860 Returns: 861 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 862 and the values are tuples containing the snapshot's hash, path, and existence status. 863 """ 864 cache: dict[str, int] = {} # hash: time_ns 865 try: 866 with open(self.snapshot_cache_path(), 'r') as stream: 867 cache = camel.load(stream.read()) 868 except: 869 pass 870 if not cache: 871 return {} 872 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 873 for file_hash, ref in cache.items(): 874 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 875 exists = os.path.exists(path) 876 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 877 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 878 continue 879 if exists or not hide_missing: 880 result[ref] = (file_hash, path, exists) 881 return result
Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status.
Parameters:
- hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True.
- verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False.
Returns:
- dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, and the values are tuples containing the snapshot's hash, path, and existence status.
883 def recall(self, dry=True, debug=False) -> bool: 884 """ 885 Revert the last operation. 886 887 Parameters: 888 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 889 debug (bool): If True, the function will print debug information. Default is False. 890 891 Returns: 892 bool: True if the operation was successful, False otherwise. 893 """ 894 if not self.nolock() or len(self._vault['history']) == 0: 895 return False 896 if len(self._vault['history']) <= 0: 897 return False 898 ref = sorted(self._vault['history'].keys())[-1] 899 if debug: 900 print('recall', ref) 901 memory = self._vault['history'][ref] 902 if debug: 903 print(type(memory), 'memory', memory) 904 limit = len(memory) + 1 905 sub_positive_log_negative = 0 906 for i in range(-1, -limit, -1): 907 x = memory[i] 908 if debug: 909 print(type(x), x) 910 match x['action']: 911 case Action.CREATE: 912 if x['account'] is not None: 913 if self.account_exists(x['account']): 914 if debug: 915 print('account', self._vault['account'][x['account']]) 916 assert len(self._vault['account'][x['account']]['box']) == 0 917 assert self._vault['account'][x['account']]['balance'] == 0 918 assert self._vault['account'][x['account']]['count'] == 0 919 if dry: 920 continue 921 del self._vault['account'][x['account']] 922 923 case Action.TRACK: 924 if x['account'] is not None: 925 if self.account_exists(x['account']): 926 if dry: 927 continue 928 self._vault['account'][x['account']]['balance'] -= x['value'] 929 self._vault['account'][x['account']]['count'] -= 1 930 del self._vault['account'][x['account']]['box'][x['ref']] 931 932 case Action.LOG: 933 if x['account'] is not None: 934 if self.account_exists(x['account']): 935 if x['ref'] in self._vault['account'][x['account']]['log']: 936 if dry: 937 continue 938 if sub_positive_log_negative == -x['value']: 939 self._vault['account'][x['account']]['count'] -= 1 940 sub_positive_log_negative = 0 941 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 942 if not box_ref is None: 943 assert self.box_exists(x['account'], box_ref) 944 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 945 assert box_value < 0 946 947 try: 948 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 949 except TypeError: 950 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 951 -box_value) 952 953 try: 954 self._vault['account'][x['account']]['balance'] += -box_value 955 except TypeError: 956 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 957 958 self._vault['account'][x['account']]['count'] -= 1 959 del self._vault['account'][x['account']]['log'][x['ref']] 960 961 case Action.SUB: 962 if x['account'] is not None: 963 if self.account_exists(x['account']): 964 if x['ref'] in self._vault['account'][x['account']]['box']: 965 if dry: 966 continue 967 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 968 self._vault['account'][x['account']]['balance'] += x['value'] 969 sub_positive_log_negative = x['value'] 970 971 case Action.ADD_FILE: 972 if x['account'] is not None: 973 if self.account_exists(x['account']): 974 if x['ref'] in self._vault['account'][x['account']]['log']: 975 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 976 if dry: 977 continue 978 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 979 980 case Action.REMOVE_FILE: 981 if x['account'] is not None: 982 if self.account_exists(x['account']): 983 if x['ref'] in self._vault['account'][x['account']]['log']: 984 if dry: 985 continue 986 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 987 988 case Action.BOX_TRANSFER: 989 if x['account'] is not None: 990 if self.account_exists(x['account']): 991 if x['ref'] in self._vault['account'][x['account']]['box']: 992 if dry: 993 continue 994 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 995 996 case Action.EXCHANGE: 997 if x['account'] is not None: 998 if x['account'] in self._vault['exchange']: 999 if x['ref'] in self._vault['exchange'][x['account']]: 1000 if dry: 1001 continue 1002 del self._vault['exchange'][x['account']][x['ref']] 1003 1004 case Action.REPORT: 1005 if x['ref'] in self._vault['report']: 1006 if dry: 1007 continue 1008 del self._vault['report'][x['ref']] 1009 1010 case Action.ZAKAT: 1011 if x['account'] is not None: 1012 if self.account_exists(x['account']): 1013 if x['ref'] in self._vault['account'][x['account']]['box']: 1014 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 1015 if dry: 1016 continue 1017 match x['math']: 1018 case MathOperation.ADDITION: 1019 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 1020 'value'] 1021 case MathOperation.EQUAL: 1022 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 1023 case MathOperation.SUBTRACTION: 1024 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 1025 'value'] 1026 1027 if not dry: 1028 del self._vault['history'][ref] 1029 return True
Revert the last operation.
Parameters: dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. debug (bool): If True, the function will print debug information. Default is False.
Returns: bool: True if the operation was successful, False otherwise.
1031 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 1032 """ 1033 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 1034 1035 Parameters: 1036 account (str): The account number for which to check the existence of the reference. 1037 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 1038 ref (int): The reference (transaction) number to check for existence. 1039 1040 Returns: 1041 bool: True if the reference exists for the given account and reference type, False otherwise. 1042 """ 1043 if account in self._vault['account']: 1044 return ref in self._vault['account'][account][ref_type] 1045 return False
Check if a specific reference (transaction) exists in the vault for a given account and reference type.
Parameters: account (str): The account number for which to check the existence of the reference. ref_type (str): The type of reference (e.g., 'box', 'log', etc.). ref (int): The reference (transaction) number to check for existence.
Returns: bool: True if the reference exists for the given account and reference type, False otherwise.
1047 def box_exists(self, account: str, ref: int) -> bool: 1048 """ 1049 Check if a specific box (transaction) exists in the vault for a given account and reference. 1050 1051 Parameters: 1052 - account (str): The account number for which to check the existence of the box. 1053 - ref (int): The reference (transaction) number to check for existence. 1054 1055 Returns: 1056 - bool: True if the box exists for the given account and reference, False otherwise. 1057 """ 1058 return self.ref_exists(account, 'box', ref)
Check if a specific box (transaction) exists in the vault for a given account and reference.
Parameters:
- account (str): The account number for which to check the existence of the box.
- ref (int): The reference (transaction) number to check for existence.
Returns:
- bool: True if the box exists for the given account and reference, False otherwise.
1060 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 1061 created: int = None, 1062 debug: bool = False) -> int: 1063 """ 1064 This function tracks a transaction for a specific account. 1065 1066 Parameters: 1067 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 1068 desc (str): The description of the transaction. Default is an empty string. 1069 account (str): The account for which the transaction is being tracked. Default is '1'. 1070 logging (bool): Whether to log the transaction. Default is True. 1071 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 1072 debug (bool): Whether to print debug information. Default is False. 1073 1074 Returns: 1075 int: The timestamp of the transaction. 1076 1077 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 1078 1079 Raises: 1080 ValueError: The log transaction happened again in the same nanosecond time. 1081 ValueError: The box transaction happened again in the same nanosecond time. 1082 """ 1083 if debug: 1084 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 1085 if created is None: 1086 created = self.time() 1087 no_lock = self.nolock() 1088 self.lock() 1089 if not self.account_exists(account): 1090 if debug: 1091 print(f"account {account} created") 1092 self._vault['account'][account] = { 1093 'balance': 0, 1094 'box': {}, 1095 'count': 0, 1096 'log': {}, 1097 'hide': False, 1098 'zakatable': True, 1099 } 1100 self._step(Action.CREATE, account) 1101 if unscaled_value == 0: 1102 if no_lock: 1103 self.free(self.lock()) 1104 return 0 1105 value = self.scale(unscaled_value) 1106 if logging: 1107 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1108 if debug: 1109 print('create-box', created) 1110 if self.box_exists(account, created): 1111 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1112 if debug: 1113 print('created-box', created) 1114 self._vault['account'][account]['box'][created] = { 1115 'capital': value, 1116 'count': 0, 1117 'last': 0, 1118 'rest': value, 1119 'total': 0, 1120 } 1121 self._step(Action.TRACK, account, ref=created, value=value) 1122 if no_lock: 1123 self.free(self.lock()) 1124 return created
This function tracks a transaction for a specific account.
Parameters: unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. desc (str): The description of the transaction. Default is an empty string. account (str): The account for which the transaction is being tracked. Default is '1'. logging (bool): Whether to log the transaction. Default is True. created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. debug (bool): Whether to print debug information. Default is False.
Returns: int: The timestamp of the transaction.
This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box.
Raises: ValueError: The log transaction happened again in the same nanosecond time. ValueError: The box transaction happened again in the same nanosecond time.
1126 def log_exists(self, account: str, ref: int) -> bool: 1127 """ 1128 Checks if a specific transaction log entry exists for a given account. 1129 1130 Parameters: 1131 account (str): The account number associated with the transaction log. 1132 ref (int): The reference to the transaction log entry. 1133 1134 Returns: 1135 bool: True if the transaction log entry exists, False otherwise. 1136 """ 1137 return self.ref_exists(account, 'log', ref)
Checks if a specific transaction log entry exists for a given account.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry.
Returns: bool: True if the transaction log entry exists, False otherwise.
1185 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1186 debug: bool = False) -> dict: 1187 """ 1188 This method is used to record or retrieve exchange rates for a specific account. 1189 1190 Parameters: 1191 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1192 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1193 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1194 - description (str): A description of the exchange rate. 1195 - debug (bool): Whether to print debug information. Default is False. 1196 1197 Returns: 1198 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1199 it returns a dictionary with default values for the rate and description. 1200 """ 1201 if debug: 1202 print('exchange', f'debug={debug}') 1203 if created is None: 1204 created = self.time() 1205 no_lock = self.nolock() 1206 self.lock() 1207 if rate is not None: 1208 if rate <= 0: 1209 return dict() 1210 if account not in self._vault['exchange']: 1211 self._vault['exchange'][account] = {} 1212 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1213 return {"time": created, "rate": 1, "description": None} 1214 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1215 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1216 if no_lock: 1217 self.free(self.lock()) 1218 if debug: 1219 print("exchange-created-1", 1220 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1221 1222 if account in self._vault['exchange']: 1223 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1224 if valid_rates: 1225 latest_rate = max(valid_rates, key=lambda x: x[0]) 1226 if debug: 1227 print("exchange-read-1", 1228 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1229 'latest_rate', latest_rate) 1230 result = latest_rate[1] 1231 result['time'] = latest_rate[0] 1232 return result # إرجاع قاموس يحتوي على المعدل والوصف 1233 if debug: 1234 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1235 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ
This method is used to record or retrieve exchange rates for a specific account.
Parameters:
- account (str): The account number for which the exchange rate is being recorded or retrieved.
- created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used.
- rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate.
- description (str): A description of the exchange rate.
- debug (bool): Whether to print debug information. Default is False.
Returns:
- dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, it returns a dictionary with default values for the rate and description.
1237 @staticmethod 1238 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1239 """ 1240 This function calculates the exchanged amount of a currency. 1241 1242 Args: 1243 x (float): The original amount of the currency. 1244 x_rate (float): The exchange rate of the original currency. 1245 y_rate (float): The exchange rate of the target currency. 1246 1247 Returns: 1248 float: The exchanged amount of the target currency. 1249 """ 1250 return (x * x_rate) / y_rate
This function calculates the exchanged amount of a currency.
Args: x (float): The original amount of the currency. x_rate (float): The exchange rate of the original currency. y_rate (float): The exchange rate of the target currency.
Returns: float: The exchanged amount of the target currency.
1252 def exchanges(self) -> dict: 1253 """ 1254 Retrieve the recorded exchange rates for all accounts. 1255 1256 Parameters: 1257 None 1258 1259 Returns: 1260 dict: A dictionary containing all recorded exchange rates. 1261 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1262 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1263 """ 1264 return self._vault['exchange'].copy()
Retrieve the recorded exchange rates for all accounts.
Parameters: None
Returns: dict: A dictionary containing all recorded exchange rates. The keys are account names or numbers, and the values are dictionaries containing the exchange rates. Each exchange rate dictionary has timestamps as keys and exchange rate details as values.
1266 def accounts(self) -> dict: 1267 """ 1268 Returns a dictionary containing account numbers as keys and their respective balances as values. 1269 1270 Parameters: 1271 None 1272 1273 Returns: 1274 dict: A dictionary where keys are account numbers and values are their respective balances. 1275 """ 1276 result = {} 1277 for i in self._vault['account']: 1278 result[i] = self._vault['account'][i]['balance'] 1279 return result
Returns a dictionary containing account numbers as keys and their respective balances as values.
Parameters: None
Returns: dict: A dictionary where keys are account numbers and values are their respective balances.
1281 def boxes(self, account) -> dict: 1282 """ 1283 Retrieve the boxes (transactions) associated with a specific account. 1284 1285 Parameters: 1286 account (str): The account number for which to retrieve the boxes. 1287 1288 Returns: 1289 dict: A dictionary containing the boxes associated with the given account. 1290 If the account does not exist, an empty dictionary is returned. 1291 """ 1292 if self.account_exists(account): 1293 return self._vault['account'][account]['box'] 1294 return {}
Retrieve the boxes (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the boxes.
Returns: dict: A dictionary containing the boxes associated with the given account. If the account does not exist, an empty dictionary is returned.
1296 def logs(self, account) -> dict: 1297 """ 1298 Retrieve the logs (transactions) associated with a specific account. 1299 1300 Parameters: 1301 account (str): The account number for which to retrieve the logs. 1302 1303 Returns: 1304 dict: A dictionary containing the logs associated with the given account. 1305 If the account does not exist, an empty dictionary is returned. 1306 """ 1307 if self.account_exists(account): 1308 return self._vault['account'][account]['log'] 1309 return {}
Retrieve the logs (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the logs.
Returns: dict: A dictionary containing the logs associated with the given account. If the account does not exist, an empty dictionary is returned.
1311 def daily_logs_init(self) -> dict[str, dict]: 1312 """ 1313 Initialize a dictionary to store daily, weekly, monthly, and yearly logs. 1314 1315 Returns: 1316 dict: A dictionary with keys 'daily', 'weekly', 'monthly', and 'yearly', each containing an empty dictionary. 1317 Later each key maps to another dictionary, which will store the logs for the corresponding time period. 1318 """ 1319 return { 1320 'daily': {}, 1321 'weekly': {}, 1322 'monthly': {}, 1323 'yearly': {}, 1324 }
Initialize a dictionary to store daily, weekly, monthly, and yearly logs.
Returns: dict: A dictionary with keys 'daily', 'weekly', 'monthly', and 'yearly', each containing an empty dictionary. Later each key maps to another dictionary, which will store the logs for the corresponding time period.
1326 def daily_logs(self, weekday: WeekDay = WeekDay.Friday, debug: bool = False): 1327 """ 1328 Retrieve the daily logs (transactions) from all accounts. 1329 1330 The function groups the logs by day, month, and year, and calculates the total value for each group. 1331 It returns a dictionary where the keys are the timestamps of the daily groups, 1332 and the values are dictionaries containing the total value and the logs for that group. 1333 1334 Parameters: 1335 weekday (WeekDay): Select the weekday is collected for the week data. Default is WeekDay.Friday. 1336 debug (bool): Whether to print debug information. Default is False. 1337 1338 Returns: 1339 dict: A dictionary containing the daily logs. 1340 1341 Example: 1342 >>> tracker = ZakatTracker() 1343 >>> tracker.sub(51, 'desc', 'account1') 1344 >>> ref = tracker.track(100, 'desc', 'account2') 1345 >>> tracker.add_file('account2', ref, 'file_0') 1346 >>> tracker.add_file('account2', ref, 'file_1') 1347 >>> tracker.add_file('account2', ref, 'file_2') 1348 >>> tracker.daily_logs() 1349 { 1350 'daily': { 1351 '2024-06-30': { 1352 'positive': 100, 1353 'negative': 51, 1354 'total': 99, 1355 'rows': [ 1356 { 1357 'account': 'account1', 1358 'desc': 'desc', 1359 'file': {}, 1360 'ref': None, 1361 'value': -51, 1362 'time': 1690977015000000000, 1363 'transfer': False, 1364 }, 1365 { 1366 'account': 'account2', 1367 'desc': 'desc', 1368 'file': { 1369 1722919011626770944: 'file_0', 1370 1722919011626812928: 'file_1', 1371 1722919011626846976: 'file_2', 1372 }, 1373 'ref': None, 1374 'value': 100, 1375 'time': 1690977015000000000, 1376 'transfer': False, 1377 }, 1378 ], 1379 }, 1380 }, 1381 'weekly': { 1382 datetime: { 1383 'positive': 100, 1384 'negative': 51, 1385 'total': 99, 1386 }, 1387 }, 1388 'monthly': { 1389 '2024-06': { 1390 'positive': 100, 1391 'negative': 51, 1392 'total': 99, 1393 }, 1394 }, 1395 'yearly': { 1396 2024: { 1397 'positive': 100, 1398 'negative': 51, 1399 'total': 99, 1400 }, 1401 }, 1402 } 1403 """ 1404 logs = {} 1405 for account in self.accounts(): 1406 for k, v in self.logs(account).items(): 1407 v['time'] = k 1408 v['account'] = account 1409 if k not in logs: 1410 logs[k] = [] 1411 logs[k].append(v) 1412 if debug: 1413 print('logs', logs) 1414 y = self.daily_logs_init() 1415 for i in sorted(logs, reverse=True): 1416 dt = self.time_to_datetime(i) 1417 daily = f'{dt.year}-{dt.month:02d}-{dt.day:02d}' 1418 weekly = dt - timedelta(days=weekday.value) 1419 monthly = f'{dt.year}-{dt.month:02d}' 1420 yearly = dt.year 1421 # daily 1422 if daily not in y['daily']: 1423 y['daily'][daily] = { 1424 'positive': 0, 1425 'negative': 0, 1426 'total': 0, 1427 'rows': [], 1428 } 1429 transfer = len(logs[i]) > 1 1430 if debug: 1431 print('logs[i]', logs[i]) 1432 for z in logs[i]: 1433 if debug: 1434 print('z', z) 1435 # daily 1436 value = z['value'] 1437 if value > 0: 1438 y['daily'][daily]['positive'] += value 1439 else: 1440 y['daily'][daily]['negative'] += -value 1441 y['daily'][daily]['total'] += value 1442 z['transfer'] = transfer 1443 y['daily'][daily]['rows'].append(z) 1444 # weekly 1445 if weekly not in y['weekly']: 1446 y['weekly'][weekly] = { 1447 'positive': 0, 1448 'negative': 0, 1449 'total': 0, 1450 } 1451 if value > 0: 1452 y['weekly'][weekly]['positive'] += value 1453 else: 1454 y['weekly'][weekly]['negative'] += -value 1455 y['weekly'][weekly]['total'] += value 1456 # monthly 1457 if monthly not in y['monthly']: 1458 y['monthly'][monthly] = { 1459 'positive': 0, 1460 'negative': 0, 1461 'total': 0, 1462 } 1463 if value > 0: 1464 y['monthly'][monthly]['positive'] += value 1465 else: 1466 y['monthly'][monthly]['negative'] += -value 1467 y['monthly'][monthly]['total'] += value 1468 # yearly 1469 if yearly not in y['yearly']: 1470 y['yearly'][yearly] = { 1471 'positive': 0, 1472 'negative': 0, 1473 'total': 0, 1474 } 1475 if value > 0: 1476 y['yearly'][yearly]['positive'] += value 1477 else: 1478 y['yearly'][yearly]['negative'] += -value 1479 y['yearly'][yearly]['total'] += value 1480 if debug: 1481 print('y', y) 1482 return y
Retrieve the daily logs (transactions) from all accounts.
The function groups the logs by day, month, and year, and calculates the total value for each group. It returns a dictionary where the keys are the timestamps of the daily groups, and the values are dictionaries containing the total value and the logs for that group.
Parameters: weekday (WeekDay): Select the weekday is collected for the week data. Default is WeekDay.Friday. debug (bool): Whether to print debug information. Default is False.
Returns: dict: A dictionary containing the daily logs.
Example:
>>> tracker = ZakatTracker()
>>> tracker.sub(51, 'desc', 'account1')
>>> ref = tracker.track(100, 'desc', 'account2')
>>> tracker.add_file('account2', ref, 'file_0')
>>> tracker.add_file('account2', ref, 'file_1')
>>> tracker.add_file('account2', ref, 'file_2')
>>> tracker.daily_logs()
{
'daily': {
'2024-06-30': {
'positive': 100,
'negative': 51,
'total': 99,
'rows': [
{
'account': 'account1',
'desc': 'desc',
'file': {},
'ref': None,
'value': -51,
'time': 1690977015000000000,
'transfer': False,
},
{
'account': 'account2',
'desc': 'desc',
'file': {
1722919011626770944: 'file_0',
1722919011626812928: 'file_1',
1722919011626846976: 'file_2',
},
'ref': None,
'value': 100,
'time': 1690977015000000000,
'transfer': False,
},
],
},
},
'weekly': {
datetime: {
'positive': 100,
'negative': 51,
'total': 99,
},
},
'monthly': {
'2024-06': {
'positive': 100,
'negative': 51,
'total': 99,
},
},
'yearly': {
2024: {
'positive': 100,
'negative': 51,
'total': 99,
},
},
}
1484 def add_file(self, account: str, ref: int, path: str) -> int: 1485 """ 1486 Adds a file reference to a specific transaction log entry in the vault. 1487 1488 Parameters: 1489 account (str): The account number associated with the transaction log. 1490 ref (int): The reference to the transaction log entry. 1491 path (str): The path of the file to be added. 1492 1493 Returns: 1494 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1495 """ 1496 if self.account_exists(account): 1497 if ref in self._vault['account'][account]['log']: 1498 file_ref = self.time() 1499 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1500 no_lock = self.nolock() 1501 self.lock() 1502 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1503 if no_lock: 1504 self.free(self.lock()) 1505 return file_ref 1506 return 0
Adds a file reference to a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. path (str): The path of the file to be added.
Returns: int: The reference of the added file. If the account or transaction log entry does not exist, returns 0.
1508 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1509 """ 1510 Removes a file reference from a specific transaction log entry in the vault. 1511 1512 Parameters: 1513 account (str): The account number associated with the transaction log. 1514 ref (int): The reference to the transaction log entry. 1515 file_ref (int): The reference of the file to be removed. 1516 1517 Returns: 1518 bool: True if the file reference is successfully removed, False otherwise. 1519 """ 1520 if self.account_exists(account): 1521 if ref in self._vault['account'][account]['log']: 1522 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1523 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1524 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1525 no_lock = self.nolock() 1526 self.lock() 1527 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1528 if no_lock: 1529 self.free(self.lock()) 1530 return True 1531 return False
Removes a file reference from a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. file_ref (int): The reference of the file to be removed.
Returns: bool: True if the file reference is successfully removed, False otherwise.
1533 def balance(self, account: str = 1, cached: bool = True) -> int: 1534 """ 1535 Calculate and return the balance of a specific account. 1536 1537 Parameters: 1538 account (str): The account number. Default is '1'. 1539 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1540 1541 Returns: 1542 int: The balance of the account. 1543 1544 Note: 1545 If cached is True, the function returns the cached balance. 1546 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1547 """ 1548 if cached: 1549 return self._vault['account'][account]['balance'] 1550 x = 0 1551 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1]
Calculate and return the balance of a specific account.
Parameters: account (str): The account number. Default is '1'. cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True.
Returns: int: The balance of the account.
Note: If cached is True, the function returns the cached balance. If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items.
1553 def hide(self, account, status: bool = None) -> bool: 1554 """ 1555 Check or set the hide status of a specific account. 1556 1557 Parameters: 1558 account (str): The account number. 1559 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1560 1561 Returns: 1562 bool: The current or updated hide status of the account. 1563 1564 Raises: 1565 None 1566 1567 Example: 1568 >>> tracker = ZakatTracker() 1569 >>> ref = tracker.track(51, 'desc', 'account1') 1570 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1571 False 1572 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1573 True 1574 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1575 True 1576 >>> tracker.hide('account1', False) 1577 False 1578 """ 1579 if self.account_exists(account): 1580 if status is None: 1581 return self._vault['account'][account]['hide'] 1582 self._vault['account'][account]['hide'] = status 1583 return status 1584 return False
Check or set the hide status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new hide status. If not provided, the function will return the current status.
Returns: bool: The current or updated hide status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.hide('account1') # Set the hide status of 'account1' to True
False
>>> tracker.hide('account1', True) # Set the hide status of 'account1' to True
True
>>> tracker.hide('account1') # Get the hide status of 'account1' by default
True
>>> tracker.hide('account1', False)
False
1586 def zakatable(self, account, status: bool = None) -> bool: 1587 """ 1588 Check or set the zakatable status of a specific account. 1589 1590 Parameters: 1591 account (str): The account number. 1592 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1593 1594 Returns: 1595 bool: The current or updated zakatable status of the account. 1596 1597 Raises: 1598 None 1599 1600 Example: 1601 >>> tracker = ZakatTracker() 1602 >>> ref = tracker.track(51, 'desc', 'account1') 1603 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1604 True 1605 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1606 True 1607 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1608 True 1609 >>> tracker.zakatable('account1', False) 1610 False 1611 """ 1612 if self.account_exists(account): 1613 if status is None: 1614 return self._vault['account'][account]['zakatable'] 1615 self._vault['account'][account]['zakatable'] = status 1616 return status 1617 return False
Check or set the zakatable status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new zakatable status. If not provided, the function will return the current status.
Returns: bool: The current or updated zakatable status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default
True
>>> tracker.zakatable('account1', False)
False
1619 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1620 debug: bool = False) \ 1621 -> tuple[ 1622 int, 1623 list[ 1624 tuple[int, int], 1625 ], 1626 ] | tuple: 1627 """ 1628 Subtracts a specified value from an account's balance. 1629 1630 Parameters: 1631 unscaled_value (float | int | Decimal): The amount to be subtracted. 1632 desc (str): A description for the transaction. Defaults to an empty string. 1633 account (str): The account from which the value will be subtracted. Defaults to '1'. 1634 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1635 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1636 1637 Returns: 1638 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1639 1640 If the amount to subtract is greater than the account's balance, 1641 the remaining amount will be transferred to a new transaction with a negative value. 1642 1643 Raises: 1644 ValueError: The box transaction happened again in the same nanosecond time. 1645 ValueError: The log transaction happened again in the same nanosecond time. 1646 """ 1647 if debug: 1648 print('sub', f'debug={debug}') 1649 if unscaled_value < 0: 1650 return tuple() 1651 if unscaled_value == 0: 1652 ref = self.track(unscaled_value, '', account) 1653 return ref, ref 1654 if created is None: 1655 created = self.time() 1656 no_lock = self.nolock() 1657 self.lock() 1658 self.track(0, '', account) 1659 value = self.scale(unscaled_value) 1660 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1661 ids = sorted(self._vault['account'][account]['box'].keys()) 1662 limit = len(ids) + 1 1663 target = value 1664 if debug: 1665 print('ids', ids) 1666 ages = [] 1667 for i in range(-1, -limit, -1): 1668 if target == 0: 1669 break 1670 j = ids[i] 1671 if debug: 1672 print('i', i, 'j', j) 1673 rest = self._vault['account'][account]['box'][j]['rest'] 1674 if rest >= target: 1675 self._vault['account'][account]['box'][j]['rest'] -= target 1676 self._step(Action.SUB, account, ref=j, value=target) 1677 ages.append((j, target)) 1678 target = 0 1679 break 1680 elif target > rest > 0: 1681 chunk = rest 1682 target -= chunk 1683 self._step(Action.SUB, account, ref=j, value=chunk) 1684 ages.append((j, chunk)) 1685 self._vault['account'][account]['box'][j]['rest'] = 0 1686 if target > 0: 1687 self.track( 1688 unscaled_value=self.unscale(-target), 1689 desc=desc, 1690 account=account, 1691 logging=False, 1692 created=created, 1693 ) 1694 ages.append((created, target)) 1695 if no_lock: 1696 self.free(self.lock()) 1697 return created, ages
Subtracts a specified value from an account's balance.
Parameters: unscaled_value (float | int | Decimal): The amount to be subtracted. desc (str): A description for the transaction. Defaults to an empty string. account (str): The account from which the value will be subtracted. Defaults to '1'. created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction.
If the amount to subtract is greater than the account's balance, the remaining amount will be transferred to a new transaction with a negative value.
Raises: ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1699 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1700 created: int = None, 1701 debug: bool = False) -> list[int]: 1702 """ 1703 Transfers a specified value from one account to another. 1704 1705 Parameters: 1706 unscaled_amount (float | int | Decimal): The amount to be transferred. 1707 from_account (str): The account from which the value will be transferred. 1708 to_account (str): The account to which the value will be transferred. 1709 desc (str, optional): A description for the transaction. Defaults to an empty string. 1710 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1711 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1712 1713 Returns: 1714 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1715 1716 Raises: 1717 ValueError: Transfer to the same account is forbidden. 1718 ValueError: The box transaction happened again in the same nanosecond time. 1719 ValueError: The log transaction happened again in the same nanosecond time. 1720 """ 1721 if debug: 1722 print('transfer', f'debug={debug}') 1723 if from_account == to_account: 1724 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1725 if unscaled_amount <= 0: 1726 return [] 1727 if created is None: 1728 created = self.time() 1729 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1730 times = [] 1731 source_exchange = self.exchange(from_account, created) 1732 target_exchange = self.exchange(to_account, created) 1733 1734 if debug: 1735 print('ages', ages) 1736 1737 for age, value in ages: 1738 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1739 if debug: 1740 print('target_amount', target_amount) 1741 # Perform the transfer 1742 if self.box_exists(to_account, age): 1743 if debug: 1744 print('box_exists', age) 1745 capital = self._vault['account'][to_account]['box'][age]['capital'] 1746 rest = self._vault['account'][to_account]['box'][age]['rest'] 1747 if debug: 1748 print( 1749 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1750 selected_age = age 1751 if rest + target_amount > capital: 1752 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1753 selected_age = ZakatTracker.time() 1754 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1755 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1756 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1757 created=None, ref=None, debug=debug) 1758 times.append((age, y)) 1759 continue 1760 if debug: 1761 print( 1762 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1763 y = self.track( 1764 unscaled_value=self.unscale(int(target_amount)), 1765 desc=desc, 1766 account=to_account, 1767 logging=True, 1768 created=age, 1769 debug=debug, 1770 ) 1771 times.append(y) 1772 return times
Transfers a specified value from one account to another.
Parameters: unscaled_amount (float | int | Decimal): The amount to be transferred. from_account (str): The account from which the value will be transferred. to_account (str): The account to which the value will be transferred. desc (str, optional): A description for the transaction. Defaults to an empty string. created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: list[int]: A list of timestamps corresponding to the transactions made during the transfer.
Raises: ValueError: Transfer to the same account is forbidden. ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1774 def check(self, 1775 silver_gram_price: float, 1776 unscaled_nisab: float | int | Decimal = None, 1777 debug: bool = False, 1778 now: int = None, 1779 cycle: float = None) -> tuple: 1780 """ 1781 Check the eligibility for Zakat based on the given parameters. 1782 1783 Parameters: 1784 silver_gram_price (float): The price of a gram of silver. 1785 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1786 it will be calculated based on the silver_gram_price. 1787 debug (bool): Flag to enable debug mode. 1788 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1789 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1790 1791 Returns: 1792 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1793 and a dictionary containing the Zakat plan. 1794 """ 1795 if debug: 1796 print('check', f'debug={debug}') 1797 if now is None: 1798 now = self.time() 1799 if cycle is None: 1800 cycle = ZakatTracker.TimeCycle() 1801 if unscaled_nisab is None: 1802 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1803 nisab = self.scale(unscaled_nisab) 1804 plan = {} 1805 below_nisab = 0 1806 brief = [0, 0, 0] 1807 valid = False 1808 if debug: 1809 print('exchanges', self.exchanges()) 1810 for x in self._vault['account']: 1811 if not self.zakatable(x): 1812 continue 1813 _box = self._vault['account'][x]['box'] 1814 _log = self._vault['account'][x]['log'] 1815 limit = len(_box) + 1 1816 ids = sorted(self._vault['account'][x]['box'].keys()) 1817 for i in range(-1, -limit, -1): 1818 j = ids[i] 1819 rest = float(_box[j]['rest']) 1820 if rest <= 0: 1821 continue 1822 exchange = self.exchange(x, created=self.time()) 1823 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1824 brief[0] += rest 1825 index = limit + i - 1 1826 epoch = (now - j) / cycle 1827 if debug: 1828 print(f"Epoch: {epoch}", _box[j]) 1829 if _box[j]['last'] > 0: 1830 epoch = (now - _box[j]['last']) / cycle 1831 if debug: 1832 print(f"Epoch: {epoch}") 1833 epoch = floor(epoch) 1834 if debug: 1835 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1836 if epoch == 0: 1837 continue 1838 if debug: 1839 print("Epoch - PASSED") 1840 brief[1] += rest 1841 if rest >= nisab: 1842 total = 0 1843 for _ in range(epoch): 1844 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1845 if total > 0: 1846 if x not in plan: 1847 plan[x] = {} 1848 valid = True 1849 brief[2] += total 1850 plan[x][index] = { 1851 'total': total, 1852 'count': epoch, 1853 'box_time': j, 1854 'box_capital': _box[j]['capital'], 1855 'box_rest': _box[j]['rest'], 1856 'box_last': _box[j]['last'], 1857 'box_total': _box[j]['total'], 1858 'box_count': _box[j]['count'], 1859 'box_log': _log[j]['desc'], 1860 'exchange_rate': exchange['rate'], 1861 'exchange_time': exchange['time'], 1862 'exchange_desc': exchange['description'], 1863 } 1864 else: 1865 chunk = ZakatTracker.ZakatCut(float(rest)) 1866 if chunk > 0: 1867 if x not in plan: 1868 plan[x] = {} 1869 if j not in plan[x].keys(): 1870 plan[x][index] = {} 1871 below_nisab += rest 1872 brief[2] += chunk 1873 plan[x][index]['below_nisab'] = chunk 1874 plan[x][index]['total'] = chunk 1875 plan[x][index]['count'] = epoch 1876 plan[x][index]['box_time'] = j 1877 plan[x][index]['box_capital'] = _box[j]['capital'] 1878 plan[x][index]['box_rest'] = _box[j]['rest'] 1879 plan[x][index]['box_last'] = _box[j]['last'] 1880 plan[x][index]['box_total'] = _box[j]['total'] 1881 plan[x][index]['box_count'] = _box[j]['count'] 1882 plan[x][index]['box_log'] = _log[j]['desc'] 1883 plan[x][index]['exchange_rate'] = exchange['rate'] 1884 plan[x][index]['exchange_time'] = exchange['time'] 1885 plan[x][index]['exchange_desc'] = exchange['description'] 1886 valid = valid or below_nisab >= nisab 1887 if debug: 1888 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1889 return valid, brief, plan
Check the eligibility for Zakat based on the given parameters.
Parameters: silver_gram_price (float): The price of a gram of silver. unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, it will be calculated based on the silver_gram_price. debug (bool): Flag to enable debug mode. now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle().
Returns: tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, and a dictionary containing the Zakat plan.
1891 def build_payment_parts(self, scaled_demand: int, positive_only: bool = True) -> dict: 1892 """ 1893 Build payment parts for the Zakat distribution. 1894 1895 Parameters: 1896 scaled_demand (int): The total demand for payment in local currency. 1897 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1898 1899 Returns: 1900 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1901 { 1902 'account': { 1903 'account_id': {'balance': float, 'rate': float, 'part': float}, 1904 ... 1905 }, 1906 'exceed': bool, 1907 'demand': int, 1908 'total': float, 1909 } 1910 """ 1911 total = 0 1912 parts = { 1913 'account': {}, 1914 'exceed': False, 1915 'demand': int(round(scaled_demand)), 1916 } 1917 for x, y in self.accounts().items(): 1918 if positive_only and y <= 0: 1919 continue 1920 total += float(y) 1921 exchange = self.exchange(x) 1922 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1923 parts['total'] = total 1924 return parts
Build payment parts for the Zakat distribution.
Parameters: scaled_demand (int): The total demand for payment in local currency. positive_only (bool): If True, only consider accounts with positive balance. Default is True.
Returns: dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: { 'account': { 'account_id': {'balance': float, 'rate': float, 'part': float}, ... }, 'exceed': bool, 'demand': int, 'total': float, }
1926 @staticmethod 1927 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1928 """ 1929 Checks the validity of payment parts. 1930 1931 Parameters: 1932 parts (dict): A dictionary containing payment parts information. 1933 debug (bool): Flag to enable debug mode. 1934 1935 Returns: 1936 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1937 1938 Error Codes: 1939 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1940 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1941 3: 'part' value in parts['account'][x] is less than 0. 1942 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1943 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1944 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1945 """ 1946 if debug: 1947 print('check_payment_parts', f'debug={debug}') 1948 for i in ['demand', 'account', 'total', 'exceed']: 1949 if i not in parts: 1950 return 1 1951 exceed = parts['exceed'] 1952 for x in parts['account']: 1953 for j in ['balance', 'rate', 'part']: 1954 if j not in parts['account'][x]: 1955 return 2 1956 if parts['account'][x]['part'] < 0: 1957 return 3 1958 if not exceed and parts['account'][x]['balance'] <= 0: 1959 return 4 1960 demand = parts['demand'] 1961 z = 0 1962 for _, y in parts['account'].items(): 1963 if not exceed and y['part'] > y['balance']: 1964 return 5 1965 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1966 z = round(z, 2) 1967 demand = round(demand, 2) 1968 if debug: 1969 print('check_payment_parts', f'z = {z}, demand = {demand}') 1970 print('check_payment_parts', type(z), type(demand)) 1971 print('check_payment_parts', z != demand) 1972 print('check_payment_parts', str(z) != str(demand)) 1973 if z != demand and str(z) != str(demand): 1974 return 6 1975 return 0
Checks the validity of payment parts.
Parameters: parts (dict): A dictionary containing payment parts information. debug (bool): Flag to enable debug mode.
Returns: int: Returns 0 if the payment parts are valid, otherwise returns the error code.
Error Codes: 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 3: 'part' value in parts['account'][x] is less than 0. 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 6: The sum of 'part' values in parts['account'] does not match with 'demand' value.
1977 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1978 """ 1979 Perform Zakat calculation based on the given report and optional parts. 1980 1981 Parameters: 1982 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1983 parts (dict): A dictionary containing the payment parts for the zakat. 1984 debug (bool): A flag indicating whether to print debug information. 1985 1986 Returns: 1987 bool: True if the zakat calculation is successful, False otherwise. 1988 """ 1989 if debug: 1990 print('zakat', f'debug={debug}') 1991 valid, _, plan = report 1992 if not valid: 1993 return valid 1994 parts_exist = parts is not None 1995 if parts_exist: 1996 if self.check_payment_parts(parts, debug=debug) != 0: 1997 return False 1998 if debug: 1999 print('######### zakat #######') 2000 print('parts_exist', parts_exist) 2001 no_lock = self.nolock() 2002 self.lock() 2003 report_time = self.time() 2004 self._vault['report'][report_time] = report 2005 self._step(Action.REPORT, ref=report_time) 2006 created = self.time() 2007 for x in plan: 2008 target_exchange = self.exchange(x) 2009 if debug: 2010 print(plan[x]) 2011 print('-------------') 2012 print(self._vault['account'][x]['box']) 2013 ids = sorted(self._vault['account'][x]['box'].keys()) 2014 if debug: 2015 print('plan[x]', plan[x]) 2016 for i in plan[x].keys(): 2017 j = ids[i] 2018 if debug: 2019 print('i', i, 'j', j) 2020 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 2021 key='last', 2022 math_operation=MathOperation.EQUAL) 2023 self._vault['account'][x]['box'][j]['last'] = created 2024 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 2025 self._vault['account'][x]['box'][j]['total'] += amount 2026 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 2027 math_operation=MathOperation.ADDITION) 2028 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 2029 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 2030 math_operation=MathOperation.ADDITION) 2031 if not parts_exist: 2032 try: 2033 self._vault['account'][x]['box'][j]['rest'] -= amount 2034 except TypeError: 2035 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 2036 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 2037 # math_operation=MathOperation.SUBTRACTION) 2038 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 2039 if parts_exist: 2040 for account, part in parts['account'].items(): 2041 if part['part'] == 0: 2042 continue 2043 if debug: 2044 print('zakat-part', account, part['rate']) 2045 target_exchange = self.exchange(account) 2046 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 2047 self.sub( 2048 unscaled_value=self.unscale(int(amount)), 2049 desc='zakat-part-دفعة-زكاة', 2050 account=account, 2051 debug=debug, 2052 ) 2053 if no_lock: 2054 self.free(self.lock()) 2055 return True
Perform Zakat calculation based on the given report and optional parts.
Parameters: report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. parts (dict): A dictionary containing the payment parts for the zakat. debug (bool): A flag indicating whether to print debug information.
Returns: bool: True if the zakat calculation is successful, False otherwise.
2057 def export_json(self, path: str = "data.json") -> bool: 2058 """ 2059 Exports the current state of the ZakatTracker object to a JSON file. 2060 2061 Parameters: 2062 path (str): The path where the JSON file will be saved. Default is "data.json". 2063 2064 Returns: 2065 bool: True if the export is successful, False otherwise. 2066 2067 Raises: 2068 No specific exceptions are raised by this method. 2069 """ 2070 with open(path, "w") as file: 2071 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 2072 return True
Exports the current state of the ZakatTracker object to a JSON file.
Parameters: path (str): The path where the JSON file will be saved. Default is "data.json".
Returns: bool: True if the export is successful, False otherwise.
Raises: No specific exceptions are raised by this method.
2074 def save(self, path: str = None) -> bool: 2075 """ 2076 Saves the ZakatTracker's current state to a camel file. 2077 2078 This method serializes the internal data (`_vault`). 2079 2080 Parameters: 2081 path (str, optional): File path for saving. Defaults to a predefined location. 2082 2083 Returns: 2084 bool: True if the save operation is successful, False otherwise. 2085 """ 2086 if path is None: 2087 path = self.path() 2088 with open(f'{path}.tmp', 'w') as stream: 2089 # first save in tmp file 2090 stream.write(camel.dump(self._vault)) 2091 # then move tmp file to original location 2092 shutil.move(f'{path}.tmp', path) 2093 return True
Saves the ZakatTracker's current state to a camel file.
This method serializes the internal data (_vault
).
Parameters: path (str, optional): File path for saving. Defaults to a predefined location.
Returns: bool: True if the save operation is successful, False otherwise.
2095 def load(self, path: str = None) -> bool: 2096 """ 2097 Load the current state of the ZakatTracker object from a camel file. 2098 2099 Parameters: 2100 path (str): The path where the camel file is located. If not provided, it will use the default path. 2101 2102 Returns: 2103 bool: True if the load operation is successful, False otherwise. 2104 """ 2105 if path is None: 2106 path = self.path() 2107 if os.path.exists(path): 2108 with open(path, 'r') as stream: 2109 self._vault = camel.load(stream.read()) 2110 return True 2111 return False
Load the current state of the ZakatTracker object from a camel file.
Parameters: path (str): The path where the camel file is located. If not provided, it will use the default path.
Returns: bool: True if the load operation is successful, False otherwise.
2113 def import_csv_cache_path(self): 2114 """ 2115 Generates the cache file path for imported CSV data. 2116 2117 This function constructs the file path where cached data from CSV imports 2118 will be stored. The cache file is a camel file (.camel extension) appended 2119 to the base path of the object. 2120 2121 Returns: 2122 str: The full path to the import CSV cache file. 2123 2124 Example: 2125 >>> obj = ZakatTracker('/data/reports') 2126 >>> obj.import_csv_cache_path() 2127 '/data/reports.import_csv.camel' 2128 """ 2129 path = str(self.path()) 2130 ext = self.ext() 2131 ext_len = len(ext) 2132 if path.endswith(f'.{ext}'): 2133 path = path[:-ext_len - 1] 2134 _, filename = os.path.split(path + f'.import_csv.{ext}') 2135 return self.base_path(filename)
Generates the cache file path for imported CSV data.
This function constructs the file path where cached data from CSV imports will be stored. The cache file is a camel file (.camel extension) appended to the base path of the object.
Returns: str: The full path to the import CSV cache file.
Example:
obj = ZakatTracker('/data/reports') obj.import_csv_cache_path() '/data/reports.import_csv.camel'
2137 def import_csv(self, path: str = 'file.csv', scale_decimal_places: int = 0, debug: bool = False) -> tuple: 2138 """ 2139 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 2140 2141 Parameters: 2142 path (str): The path to the CSV file. Default is 'file.csv'. 2143 scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. 2144 debug (bool): A flag indicating whether to print debug information. 2145 2146 Returns: 2147 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 2148 and a dictionary of bad transactions. 2149 2150 Notes: 2151 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 2152 are appropriate for the currency pairs involved in the conversions. 2153 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 2154 to 1.0 or the previous rate for that account. 2155 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 2156 transactions of the same account within the whole imported and existing dataset when doing `check` and 2157 `zakat` operations. 2158 2159 Example Usage: 2160 The CSV file should have the following format, rate is optional per transaction: 2161 account, desc, value, date, rate 2162 For example: 2163 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 2164 """ 2165 if debug: 2166 print('import_csv', f'debug={debug}') 2167 cache: list[int] = [] 2168 try: 2169 with open(self.import_csv_cache_path(), 'r') as stream: 2170 cache = camel.load(stream.read()) 2171 except: 2172 pass 2173 date_formats = [ 2174 "%Y-%m-%d %H:%M:%S", 2175 "%Y-%m-%dT%H:%M:%S", 2176 "%Y-%m-%dT%H%M%S", 2177 "%Y-%m-%d", 2178 ] 2179 created, found, bad = 0, 0, {} 2180 data: dict[int, list] = {} 2181 with open(path, newline='', encoding="utf-8") as f: 2182 i = 0 2183 for row in csv.reader(f, delimiter=','): 2184 i += 1 2185 hashed = hash(tuple(row)) 2186 if hashed in cache: 2187 found += 1 2188 continue 2189 account = row[0] 2190 desc = row[1] 2191 value = float(row[2]) 2192 rate = 1.0 2193 if row[4:5]: # Empty list if index is out of range 2194 rate = float(row[4]) 2195 date: int = 0 2196 for time_format in date_formats: 2197 try: 2198 date = self.time(datetime.datetime.strptime(row[3], time_format)) 2199 break 2200 except: 2201 pass 2202 # TODO: not allowed for negative dates in the future after enhance time functions 2203 if date == 0: 2204 bad[i] = row + ['invalid date'] 2205 if value == 0: 2206 bad[i] = row + ['invalid value'] 2207 continue 2208 if date not in data: 2209 data[date] = [] 2210 data[date].append((i, account, desc, value, date, rate, hashed)) 2211 2212 if debug: 2213 print('import_csv', len(data)) 2214 2215 if bad: 2216 return created, found, bad 2217 2218 for date, rows in sorted(data.items()): 2219 try: 2220 len_rows = len(rows) 2221 if len_rows == 1: 2222 (_, account, desc, unscaled_value, date, rate, hashed) = rows[0] 2223 value = self.unscale( 2224 unscaled_value, 2225 decimal_places=scale_decimal_places, 2226 ) if scale_decimal_places > 0 else unscaled_value 2227 if rate > 0: 2228 self.exchange(account=account, created=date, rate=rate) 2229 if value > 0: 2230 self.track(unscaled_value=value, desc=desc, account=account, logging=True, created=date) 2231 elif value < 0: 2232 self.sub(unscaled_value=-value, desc=desc, account=account, created=date) 2233 created += 1 2234 cache.append(hashed) 2235 continue 2236 if debug: 2237 print('-- Duplicated time detected', date, 'len', len_rows) 2238 print(rows) 2239 print('---------------------------------') 2240 # If records are found at the same time with different accounts in the same amount 2241 # (one positive and the other negative), this indicates it is a transfer. 2242 if len_rows != 2: 2243 raise Exception(f'more than two transactions({len_rows}) at the same time') 2244 (i, account1, desc1, unscaled_value1, date1, rate1, _) = rows[0] 2245 (j, account2, desc2, unscaled_value2, date2, rate2, _) = rows[1] 2246 if account1 == account2 or desc1 != desc2 or abs(unscaled_value1) != abs( 2247 unscaled_value2) or date1 != date2: 2248 raise Exception('invalid transfer') 2249 if rate1 > 0: 2250 self.exchange(account1, created=date1, rate=rate1) 2251 if rate2 > 0: 2252 self.exchange(account2, created=date2, rate=rate2) 2253 value1 = self.unscale( 2254 unscaled_value1, 2255 decimal_places=scale_decimal_places, 2256 ) if scale_decimal_places > 0 else unscaled_value1 2257 value2 = self.unscale( 2258 unscaled_value2, 2259 decimal_places=scale_decimal_places, 2260 ) if scale_decimal_places > 0 else unscaled_value2 2261 values = { 2262 value1: account1, 2263 value2: account2, 2264 } 2265 self.transfer( 2266 unscaled_amount=abs(value1), 2267 from_account=values[min(values.keys())], 2268 to_account=values[max(values.keys())], 2269 desc=desc1, 2270 created=date1, 2271 ) 2272 except Exception as e: 2273 for (i, account, desc, value, date, rate, _) in rows: 2274 bad[i] = (account, desc, value, date, rate, e) 2275 break 2276 with open(self.import_csv_cache_path(), 'w') as stream: 2277 stream.write(camel.dump(cache)) 2278 return created, found, bad
The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system.
Parameters: path (str): The path to the CSV file. Default is 'file.csv'. scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. debug (bool): A flag indicating whether to print debug information.
Returns: tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, and a dictionary of bad transactions.
Notes:
* Currency Pair Assumption: This function assumes that the exchange rates stored for each account
are appropriate for the currency pairs involved in the conversions.
* The exchange rate for each account is based on the last encountered transaction rate that is not equal
to 1.0 or the previous rate for that account.
* Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent
transactions of the same account within the whole imported and existing dataset when doing check
and
zakat
operations.
Example Usage: The CSV file should have the following format, rate is optional per transaction: account, desc, value, date, rate For example: safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1
2284 @staticmethod 2285 def human_readable_size(size: float, decimal_places: int = 2) -> str: 2286 """ 2287 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 2288 2289 This function iterates through progressively larger units of information 2290 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2291 range that can be expressed with a reasonable number before the unit. 2292 2293 Parameters: 2294 size (float): The size in bytes to convert. 2295 decimal_places (int, optional): The number of decimal places to display 2296 in the result. Defaults to 2. 2297 2298 Returns: 2299 str: A string representation of the size in a human-readable format, 2300 rounded to the specified number of decimal places. For example: 2301 - "1.50 KB" (1536 bytes) 2302 - "23.00 MB" (24117248 bytes) 2303 - "1.23 GB" (1325899906 bytes) 2304 """ 2305 if type(size) not in (float, int): 2306 raise TypeError("size must be a float or integer") 2307 if type(decimal_places) != int: 2308 raise TypeError("decimal_places must be an integer") 2309 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2310 if size < 1024.0: 2311 break 2312 size /= 1024.0 2313 return f"{size:.{decimal_places}f} {unit}"
Converts a size in bytes to a human-readable format (e.g., KB, MB, GB).
This function iterates through progressively larger units of information (B, KB, MB, GB, etc.) and divides the input size until it fits within a range that can be expressed with a reasonable number before the unit.
Parameters: size (float): The size in bytes to convert. decimal_places (int, optional): The number of decimal places to display in the result. Defaults to 2.
Returns: str: A string representation of the size in a human-readable format, rounded to the specified number of decimal places. For example: - "1.50 KB" (1536 bytes) - "23.00 MB" (24117248 bytes) - "1.23 GB" (1325899906 bytes)
2315 @staticmethod 2316 def get_dict_size(obj: dict, seen: set = None) -> float: 2317 """ 2318 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2319 2320 This function traverses the dictionary structure, accounting for the size of keys, values, 2321 and any nested objects. It handles various data types commonly found in dictionaries 2322 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2323 of circular references. 2324 2325 Parameters: 2326 obj (dict): The dictionary whose size is to be calculated. 2327 seen (set, optional): A set used internally to track visited objects 2328 and avoid circular references. Defaults to None. 2329 2330 Returns: 2331 float: An approximate size of the dictionary and its contents in bytes. 2332 2333 Note: 2334 - This function is a method of the `ZakatTracker` class and is likely used to 2335 estimate the memory footprint of data structures relevant to Zakat calculations. 2336 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2337 not account for all memory overhead depending on the Python implementation. 2338 - Circular references are handled to prevent infinite recursion. 2339 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2340 - String sizes are estimated based on character length and encoding. 2341 """ 2342 size = 0 2343 if seen is None: 2344 seen = set() 2345 2346 obj_id = id(obj) 2347 if obj_id in seen: 2348 return 0 2349 2350 seen.add(obj_id) 2351 size += sys.getsizeof(obj) 2352 2353 if isinstance(obj, dict): 2354 for k, v in obj.items(): 2355 size += ZakatTracker.get_dict_size(k, seen) 2356 size += ZakatTracker.get_dict_size(v, seen) 2357 elif isinstance(obj, (list, tuple, set, frozenset)): 2358 for item in obj: 2359 size += ZakatTracker.get_dict_size(item, seen) 2360 elif isinstance(obj, (int, float, complex)): # Handle numbers 2361 pass # Basic numbers have a fixed size, so nothing to add here 2362 elif isinstance(obj, str): # Handle strings 2363 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2364 return size
Recursively calculates the approximate memory size of a dictionary and its contents in bytes.
This function traverses the dictionary structure, accounting for the size of keys, values, and any nested objects. It handles various data types commonly found in dictionaries (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case of circular references.
Parameters: obj (dict): The dictionary whose size is to be calculated. seen (set, optional): A set used internally to track visited objects and avoid circular references. Defaults to None.
Returns: float: An approximate size of the dictionary and its contents in bytes.
Note:
- This function is a method of the
ZakatTracker
class and is likely used to estimate the memory footprint of data structures relevant to Zakat calculations. - The size calculation is approximate as it relies on
sys.getsizeof()
, which might not account for all memory overhead depending on the Python implementation. - Circular references are handled to prevent infinite recursion.
- Basic numeric types (int, float, complex) are assumed to have fixed sizes.
- String sizes are estimated based on character length and encoding.
2366 @staticmethod 2367 def duration_from_nanoseconds(ns: int, 2368 show_zeros_in_spoken_time: bool = False, 2369 spoken_time_separator=',', 2370 millennia: str = 'Millennia', 2371 century: str = 'Century', 2372 years: str = 'Years', 2373 days: str = 'Days', 2374 hours: str = 'Hours', 2375 minutes: str = 'Minutes', 2376 seconds: str = 'Seconds', 2377 milli_seconds: str = 'MilliSeconds', 2378 micro_seconds: str = 'MicroSeconds', 2379 nano_seconds: str = 'NanoSeconds', 2380 ) -> tuple: 2381 """ 2382 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2383 Convert NanoSeconds to Human Readable Time Format. 2384 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2385 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2386 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2387 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2388 2389 INPUT : ms (AKA: MilliSeconds) 2390 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2391 OUTPUT Variables: time_lapsed, spoken_time 2392 2393 Example Input: duration_from_nanoseconds(ns) 2394 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2395 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2396 duration_from_nanoseconds(1234567890123456789012) 2397 """ 2398 us, ns = divmod(ns, 1000) 2399 ms, us = divmod(us, 1000) 2400 s, ms = divmod(ms, 1000) 2401 m, s = divmod(s, 60) 2402 h, m = divmod(m, 60) 2403 d, h = divmod(h, 24) 2404 y, d = divmod(d, 365) 2405 c, y = divmod(y, 100) 2406 n, c = divmod(c, 10) 2407 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2408 spoken_time_part = [] 2409 if n > 0 or show_zeros_in_spoken_time: 2410 spoken_time_part.append(f"{n: 3d} {millennia}") 2411 if c > 0 or show_zeros_in_spoken_time: 2412 spoken_time_part.append(f"{c: 4d} {century}") 2413 if y > 0 or show_zeros_in_spoken_time: 2414 spoken_time_part.append(f"{y: 3d} {years}") 2415 if d > 0 or show_zeros_in_spoken_time: 2416 spoken_time_part.append(f"{d: 4d} {days}") 2417 if h > 0 or show_zeros_in_spoken_time: 2418 spoken_time_part.append(f"{h: 2d} {hours}") 2419 if m > 0 or show_zeros_in_spoken_time: 2420 spoken_time_part.append(f"{m: 2d} {minutes}") 2421 if s > 0 or show_zeros_in_spoken_time: 2422 spoken_time_part.append(f"{s: 2d} {seconds}") 2423 if ms > 0 or show_zeros_in_spoken_time: 2424 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2425 if us > 0 or show_zeros_in_spoken_time: 2426 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2427 if ns > 0 or show_zeros_in_spoken_time: 2428 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2429 return time_lapsed, spoken_time_separator.join(spoken_time_part)
REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 Convert NanoSeconds to Human Readable Time Format. A NanoSeconds is a unit of time in the International System of Units (SI) equal to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. Its symbol is μs, sometimes simplified to us when Unicode is not available. A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond.
INPUT : ms (AKA: MilliSeconds) OUTPUT: tuple(string time_lapsed, string spoken_time) like format. OUTPUT Variables: time_lapsed, spoken_time
Example Input: duration_from_nanoseconds(ns) "Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds" Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') duration_from_nanoseconds(1234567890123456789012)
2431 @staticmethod 2432 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2433 """ 2434 Convert a specific day, month, and year into a timestamp. 2435 2436 Parameters: 2437 day (int): The day of the month. 2438 month (int): The month of the year. Default is 6 (June). 2439 year (int): The year. Default is 2024. 2440 2441 Returns: 2442 int: The timestamp representing the given day, month, and year. 2443 2444 Note: 2445 This method assumes the default month and year if not provided. 2446 """ 2447 return ZakatTracker.time(datetime.datetime(year, month, day))
Convert a specific day, month, and year into a timestamp.
Parameters: day (int): The day of the month. month (int): The month of the year. Default is 6 (June). year (int): The year. Default is 2024.
Returns: int: The timestamp representing the given day, month, and year.
Note: This method assumes the default month and year if not provided.
2449 @staticmethod 2450 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2451 """ 2452 Generate a random date between two given dates. 2453 2454 Parameters: 2455 start_date (datetime.datetime): The start date from which to generate a random date. 2456 end_date (datetime.datetime): The end date until which to generate a random date. 2457 2458 Returns: 2459 datetime.datetime: A random date between the start_date and end_date. 2460 """ 2461 time_between_dates = end_date - start_date 2462 days_between_dates = time_between_dates.days 2463 random_number_of_days = random.randrange(days_between_dates) 2464 return start_date + datetime.timedelta(days=random_number_of_days)
Generate a random date between two given dates.
Parameters: start_date (datetime.datetime): The start date from which to generate a random date. end_date (datetime.datetime): The end date until which to generate a random date.
Returns: datetime.datetime: A random date between the start_date and end_date.
2466 @staticmethod 2467 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2468 debug: bool = False) -> int: 2469 """ 2470 Generate a random CSV file with specified parameters. 2471 2472 Parameters: 2473 path (str): The path where the CSV file will be saved. Default is "data.csv". 2474 count (int): The number of rows to generate in the CSV file. Default is 1000. 2475 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2476 debug (bool): A flag indicating whether to print debug information. 2477 2478 Returns: 2479 None. The function generates a CSV file at the specified path with the given count of rows. 2480 Each row contains a randomly generated account, description, value, and date. 2481 The value is randomly generated between 1000 and 100000, 2482 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2483 If the row number is not divisible by 13, the value is multiplied by -1. 2484 """ 2485 if debug: 2486 print('generate_random_csv_file', f'debug={debug}') 2487 i = 0 2488 with open(path, "w", newline="") as csvfile: 2489 writer = csv.writer(csvfile) 2490 for i in range(count): 2491 account = f"acc-{random.randint(1, 1000)}" 2492 desc = f"Some text {random.randint(1, 1000)}" 2493 value = random.randint(1000, 100000) 2494 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2495 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2496 if not i % 13 == 0: 2497 value *= -1 2498 row = [account, desc, value, date] 2499 if with_rate: 2500 rate = random.randint(1, 100) * 0.12 2501 if debug: 2502 print('before-append', row) 2503 row.append(rate) 2504 if debug: 2505 print('after-append', row) 2506 writer.writerow(row) 2507 i = i + 1 2508 return i
Generate a random CSV file with specified parameters.
Parameters: path (str): The path where the CSV file will be saved. Default is "data.csv". count (int): The number of rows to generate in the CSV file. Default is 1000. with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. debug (bool): A flag indicating whether to print debug information.
Returns: None. The function generates a CSV file at the specified path with the given count of rows. Each row contains a randomly generated account, description, value, and date. The value is randomly generated between 1000 and 100000, and the date is randomly generated between 1950-01-01 and 2023-12-31. If the row number is not divisible by 13, the value is multiplied by -1.
2510 @staticmethod 2511 def create_random_list(max_sum, min_value=0, max_value=10): 2512 """ 2513 Creates a list of random integers whose sum does not exceed the specified maximum. 2514 2515 Args: 2516 max_sum: The maximum allowed sum of the list elements. 2517 min_value: The minimum possible value for an element (inclusive). 2518 max_value: The maximum possible value for an element (inclusive). 2519 2520 Returns: 2521 A list of random integers. 2522 """ 2523 result = [] 2524 current_sum = 0 2525 2526 while current_sum < max_sum: 2527 # Calculate the remaining space for the next element 2528 remaining_sum = max_sum - current_sum 2529 # Determine the maximum possible value for the next element 2530 next_max_value = min(remaining_sum, max_value) 2531 # Generate a random element within the allowed range 2532 next_element = random.randint(min_value, next_max_value) 2533 result.append(next_element) 2534 current_sum += next_element 2535 2536 return result
Creates a list of random integers whose sum does not exceed the specified maximum.
Args: max_sum: The maximum allowed sum of the list elements. min_value: The minimum possible value for an element (inclusive). max_value: The maximum possible value for an element (inclusive).
Returns: A list of random integers.
2794 def test(self, debug: bool = False) -> bool: 2795 if debug: 2796 print('test', f'debug={debug}') 2797 try: 2798 2799 self._test_core(True, debug) 2800 self._test_core(False, debug) 2801 2802 assert self._history() 2803 2804 # Not allowed for duplicate transactions in the same account and time 2805 2806 created = ZakatTracker.time() 2807 self.track(100, 'test-1', 'same', True, created) 2808 failed = False 2809 try: 2810 self.track(50, 'test-1', 'same', True, created) 2811 except: 2812 failed = True 2813 assert failed is True 2814 2815 self.reset() 2816 2817 # Same account transfer 2818 for x in [1, 'a', True, 1.8, None]: 2819 failed = False 2820 try: 2821 self.transfer(1, x, x, 'same-account', debug=debug) 2822 except: 2823 failed = True 2824 assert failed is True 2825 2826 # Always preserve box age during transfer 2827 2828 series: list[tuple] = [ 2829 (30, 4), 2830 (60, 3), 2831 (90, 2), 2832 ] 2833 case = { 2834 3000: { 2835 'series': series, 2836 'rest': 15000, 2837 }, 2838 6000: { 2839 'series': series, 2840 'rest': 12000, 2841 }, 2842 9000: { 2843 'series': series, 2844 'rest': 9000, 2845 }, 2846 18000: { 2847 'series': series, 2848 'rest': 0, 2849 }, 2850 27000: { 2851 'series': series, 2852 'rest': -9000, 2853 }, 2854 36000: { 2855 'series': series, 2856 'rest': -18000, 2857 }, 2858 } 2859 2860 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2861 2862 for total in case: 2863 if debug: 2864 print('--------------------------------------------------------') 2865 print(f'case[{total}]', case[total]) 2866 for x in case[total]['series']: 2867 self.track( 2868 unscaled_value=x[0], 2869 desc=f"test-{x} ages", 2870 account='ages', 2871 logging=True, 2872 created=selected_time * x[1], 2873 ) 2874 2875 unscaled_total = self.unscale(total) 2876 if debug: 2877 print('unscaled_total', unscaled_total) 2878 refs = self.transfer( 2879 unscaled_amount=unscaled_total, 2880 from_account='ages', 2881 to_account='future', 2882 desc='Zakat Movement', 2883 debug=debug, 2884 ) 2885 2886 if debug: 2887 print('refs', refs) 2888 2889 ages_cache_balance = self.balance('ages') 2890 ages_fresh_balance = self.balance('ages', False) 2891 rest = case[total]['rest'] 2892 if debug: 2893 print('source', ages_cache_balance, ages_fresh_balance, rest) 2894 assert ages_cache_balance == rest 2895 assert ages_fresh_balance == rest 2896 2897 future_cache_balance = self.balance('future') 2898 future_fresh_balance = self.balance('future', False) 2899 if debug: 2900 print('target', future_cache_balance, future_fresh_balance, total) 2901 print('refs', refs) 2902 assert future_cache_balance == total 2903 assert future_fresh_balance == total 2904 2905 # TODO: check boxes times for `ages` should equal box times in `future` 2906 for ref in self._vault['account']['ages']['box']: 2907 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2908 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2909 future_capital = 0 2910 if ref in self._vault['account']['future']['box']: 2911 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2912 future_rest = 0 2913 if ref in self._vault['account']['future']['box']: 2914 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2915 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2916 if debug: 2917 print('================================================================') 2918 print('ages', ages_capital, ages_rest) 2919 print('future', future_capital, future_rest) 2920 if ages_rest == 0: 2921 assert ages_capital == future_capital 2922 elif ages_rest < 0: 2923 assert -ages_capital == future_capital 2924 elif ages_rest > 0: 2925 assert ages_capital == ages_rest + future_capital 2926 self.reset() 2927 assert len(self._vault['history']) == 0 2928 2929 assert self._history() 2930 assert self._history(False) is False 2931 assert self._history() is False 2932 assert self._history(True) 2933 assert self._history() 2934 if debug: 2935 print('####################################################################') 2936 2937 transaction = [ 2938 ( 2939 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2940 2000, 2000, 2000, 1, 1, 2941 ), 2942 ( 2943 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2944 75000, 75000, 75000, 1, 1, 2945 ), 2946 ( 2947 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2948 60000, 60000, 60000, 1, 1, 2949 ), 2950 ] 2951 for z in transaction: 2952 self.lock() 2953 x = z[1] 2954 y = z[2] 2955 self.transfer( 2956 unscaled_amount=z[0], 2957 from_account=x, 2958 to_account=y, 2959 desc='test-transfer', 2960 debug=debug, 2961 ) 2962 zz = self.balance(x) 2963 if debug: 2964 print(zz, z) 2965 assert zz == z[3] 2966 xx = self.accounts()[x] 2967 assert xx == z[3] 2968 assert self.balance(x, False) == z[4] 2969 assert xx == z[4] 2970 2971 s = 0 2972 log = self._vault['account'][x]['log'] 2973 for i in log: 2974 s += log[i]['value'] 2975 if debug: 2976 print('s', s, 'z[5]', z[5]) 2977 assert s == z[5] 2978 2979 assert self.box_size(x) == z[6] 2980 assert self.log_size(x) == z[7] 2981 2982 yy = self.accounts()[y] 2983 assert self.balance(y) == z[8] 2984 assert yy == z[8] 2985 assert self.balance(y, False) == z[9] 2986 assert yy == z[9] 2987 2988 s = 0 2989 log = self._vault['account'][y]['log'] 2990 for i in log: 2991 s += log[i]['value'] 2992 assert s == z[10] 2993 2994 assert self.box_size(y) == z[11] 2995 assert self.log_size(y) == z[12] 2996 assert self.free(self.lock()) 2997 2998 if debug: 2999 pp().pprint(self.check(2.17)) 3000 3001 assert not self.nolock() 3002 history_count = len(self._vault['history']) 3003 if debug: 3004 print('history-count', history_count) 3005 assert history_count == 4 3006 assert not self.free(ZakatTracker.time()) 3007 assert self.free(self.lock()) 3008 assert self.nolock() 3009 assert len(self._vault['history']) == 3 3010 3011 # storage 3012 3013 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 3014 if os.path.exists(_path): 3015 os.remove(_path) 3016 self.save() 3017 assert os.path.getsize(_path) > 0 3018 self.reset() 3019 assert self.recall(False, debug) is False 3020 self.load() 3021 assert self._vault['account'] is not None 3022 3023 # recall 3024 3025 assert self.nolock() 3026 assert len(self._vault['history']) == 3 3027 assert self.recall(False, debug) is True 3028 assert len(self._vault['history']) == 2 3029 assert self.recall(False, debug) is True 3030 assert len(self._vault['history']) == 1 3031 assert self.recall(False, debug) is True 3032 assert len(self._vault['history']) == 0 3033 assert self.recall(False, debug) is False 3034 assert len(self._vault['history']) == 0 3035 3036 # exchange 3037 3038 self.exchange("cash", 25, 3.75, "2024-06-25") 3039 self.exchange("cash", 22, 3.73, "2024-06-22") 3040 self.exchange("cash", 15, 3.69, "2024-06-15") 3041 self.exchange("cash", 10, 3.66) 3042 3043 for i in range(1, 30): 3044 exchange = self.exchange("cash", i) 3045 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 3046 if debug: 3047 print(i, rate, description, created) 3048 assert created 3049 if i < 10: 3050 assert rate == 1 3051 assert description is None 3052 elif i == 10: 3053 assert rate == 3.66 3054 assert description is None 3055 elif i < 15: 3056 assert rate == 3.66 3057 assert description is None 3058 elif i == 15: 3059 assert rate == 3.69 3060 assert description is not None 3061 elif i < 22: 3062 assert rate == 3.69 3063 assert description is not None 3064 elif i == 22: 3065 assert rate == 3.73 3066 assert description is not None 3067 elif i >= 25: 3068 assert rate == 3.75 3069 assert description is not None 3070 exchange = self.exchange("bank", i) 3071 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 3072 if debug: 3073 print(i, rate, description, created) 3074 assert created 3075 assert rate == 1 3076 assert description is None 3077 3078 assert len(self._vault['exchange']) > 0 3079 assert len(self.exchanges()) > 0 3080 self._vault['exchange'].clear() 3081 assert len(self._vault['exchange']) == 0 3082 assert len(self.exchanges()) == 0 3083 3084 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 3085 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 3086 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 3087 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 3088 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 3089 3090 for i in [x * 0.12 for x in range(-15, 21)]: 3091 if i <= 0: 3092 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 3093 else: 3094 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 3095 3096 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 3097 for i in range(1, 31): 3098 timestamp_ns = ZakatTracker.day_to_time(i) 3099 exchange = self.exchange("cash", timestamp_ns) 3100 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 3101 if debug: 3102 print(i, rate, description, created) 3103 assert created 3104 if i < 10: 3105 assert rate == 1 3106 assert description is None 3107 elif i == 10: 3108 assert rate == 3.66 3109 assert description is None 3110 elif i < 15: 3111 assert rate == 3.66 3112 assert description is None 3113 elif i == 15: 3114 assert rate == 3.69 3115 assert description is not None 3116 elif i < 22: 3117 assert rate == 3.69 3118 assert description is not None 3119 elif i == 22: 3120 assert rate == 3.73 3121 assert description is not None 3122 elif i >= 25: 3123 assert rate == 3.75 3124 assert description is not None 3125 exchange = self.exchange("bank", i) 3126 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 3127 if debug: 3128 print(i, rate, description, created) 3129 assert created 3130 assert rate == 1 3131 assert description is None 3132 3133 # csv 3134 3135 csv_count = 1000 3136 3137 for with_rate, path in { 3138 False: 'test-import_csv-no-exchange', 3139 True: 'test-import_csv-with-exchange', 3140 }.items(): 3141 3142 if debug: 3143 print('test_import_csv', with_rate, path) 3144 3145 csv_path = path + '.csv' 3146 if os.path.exists(csv_path): 3147 os.remove(csv_path) 3148 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 3149 if debug: 3150 print('generate_random_csv_file', c) 3151 assert c == csv_count 3152 assert os.path.getsize(csv_path) > 0 3153 cache_path = self.import_csv_cache_path() 3154 if os.path.exists(cache_path): 3155 os.remove(cache_path) 3156 self.reset() 3157 (created, found, bad) = self.import_csv(csv_path, debug) 3158 bad_count = len(bad) 3159 assert bad_count > 0 3160 if debug: 3161 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 3162 print('bad', bad) 3163 tmp_size = os.path.getsize(cache_path) 3164 assert tmp_size > 0 3165 # TODO: assert created + found + bad_count == csv_count 3166 # TODO: assert created == csv_count 3167 # TODO: assert bad_count == 0 3168 (created_2, found_2, bad_2) = self.import_csv(csv_path) 3169 bad_2_count = len(bad_2) 3170 if debug: 3171 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 3172 print('bad', bad) 3173 assert bad_2_count > 0 3174 # TODO: assert tmp_size == os.path.getsize(cache_path) 3175 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 3176 # TODO: assert created == found_2 3177 # TODO: assert bad_count == bad_2_count 3178 # TODO: assert found_2 == csv_count 3179 # TODO: assert bad_2_count == 0 3180 # TODO: assert created_2 == 0 3181 3182 # payment parts 3183 3184 positive_parts = self.build_payment_parts(100, positive_only=True) 3185 assert self.check_payment_parts(positive_parts) != 0 3186 assert self.check_payment_parts(positive_parts) != 0 3187 all_parts = self.build_payment_parts(300, positive_only=False) 3188 assert self.check_payment_parts(all_parts) != 0 3189 assert self.check_payment_parts(all_parts) != 0 3190 if debug: 3191 pp().pprint(positive_parts) 3192 pp().pprint(all_parts) 3193 # dynamic discount 3194 suite = [] 3195 count = 3 3196 for exceed in [False, True]: 3197 case = [] 3198 for parts in [positive_parts, all_parts]: 3199 part = parts.copy() 3200 demand = part['demand'] 3201 if debug: 3202 print(demand, part['total']) 3203 i = 0 3204 z = demand / count 3205 cp = { 3206 'account': {}, 3207 'demand': demand, 3208 'exceed': exceed, 3209 'total': part['total'], 3210 } 3211 j = '' 3212 for x, y in part['account'].items(): 3213 x_exchange = self.exchange(x) 3214 zz = self.exchange_calc(z, 1, x_exchange['rate']) 3215 if exceed and zz <= demand: 3216 i += 1 3217 y['part'] = zz 3218 if debug: 3219 print(exceed, y) 3220 cp['account'][x] = y 3221 case.append(y) 3222 elif not exceed and y['balance'] >= zz: 3223 i += 1 3224 y['part'] = zz 3225 if debug: 3226 print(exceed, y) 3227 cp['account'][x] = y 3228 case.append(y) 3229 j = x 3230 if i >= count: 3231 break 3232 if len(cp['account'][j]) > 0: 3233 suite.append(cp) 3234 if debug: 3235 print('suite', len(suite)) 3236 # vault = self._vault.copy() 3237 for case in suite: 3238 # self._vault = vault.copy() 3239 if debug: 3240 print('case', case) 3241 result = self.check_payment_parts(case) 3242 if debug: 3243 print('check_payment_parts', result, f'exceed: {exceed}') 3244 assert result == 0 3245 3246 report = self.check(2.17, None, debug) 3247 (valid, brief, plan) = report 3248 if debug: 3249 print('valid', valid) 3250 zakat_result = self.zakat(report, parts=case, debug=debug) 3251 if debug: 3252 print('zakat-result', zakat_result) 3253 assert valid == zakat_result 3254 3255 assert self.save(path + f'.{self.ext()}') 3256 assert self.export_json(path + '.json') 3257 3258 assert self.export_json("1000-transactions-test.json") 3259 assert self.save(f"1000-transactions-test.{self.ext()}") 3260 3261 self.reset() 3262 3263 # test transfer between accounts with different exchange rate 3264 3265 a_SAR = "Bank (SAR)" 3266 b_USD = "Bank (USD)" 3267 c_SAR = "Safe (SAR)" 3268 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 3269 for case in [ 3270 (0, a_SAR, "SAR Gift", 1000, 100000), 3271 (1, a_SAR, 1), 3272 (0, b_USD, "USD Gift", 500, 50000), 3273 (1, b_USD, 1), 3274 (2, b_USD, 3.75), 3275 (1, b_USD, 3.75), 3276 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 3277 (0, c_SAR, "Salary", 750, 75000), 3278 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 3279 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 3280 ]: 3281 if debug: 3282 print('case', case) 3283 match (case[0]): 3284 case 0: # track 3285 _, account, desc, x, balance = case 3286 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 3287 3288 cached_value = self.balance(account, cached=True) 3289 fresh_value = self.balance(account, cached=False) 3290 if debug: 3291 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 3292 assert cached_value == balance 3293 assert fresh_value == balance 3294 case 1: # check-exchange 3295 _, account, expected_rate = case 3296 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 3297 if debug: 3298 print('t-exchange', t_exchange) 3299 assert t_exchange['rate'] == expected_rate 3300 case 2: # do-exchange 3301 _, account, rate = case 3302 self.exchange(account, rate=rate, debug=debug) 3303 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 3304 if debug: 3305 print('b-exchange', b_exchange) 3306 assert b_exchange['rate'] == rate 3307 case 3: # transfer 3308 _, x, a, b, desc, a_balance, b_balance = case 3309 self.transfer(x, a, b, desc, debug=debug) 3310 3311 cached_value = self.balance(a, cached=True) 3312 fresh_value = self.balance(a, cached=False) 3313 if debug: 3314 print( 3315 'account', a, 3316 'cached_value', cached_value, 3317 'fresh_value', fresh_value, 3318 'a_balance', a_balance, 3319 ) 3320 assert cached_value == a_balance 3321 assert fresh_value == a_balance 3322 3323 cached_value = self.balance(b, cached=True) 3324 fresh_value = self.balance(b, cached=False) 3325 if debug: 3326 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3327 assert cached_value == b_balance 3328 assert fresh_value == b_balance 3329 3330 # Transfer all in many chunks randomly from B to A 3331 a_SAR_balance = 137125 3332 b_USD_balance = 50100 3333 b_USD_exchange = self.exchange(b_USD) 3334 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3335 if debug: 3336 print('amounts', amounts) 3337 i = 0 3338 for x in amounts: 3339 if debug: 3340 print(f'{i} - transfer-with-exchange({x})') 3341 self.transfer( 3342 unscaled_amount=self.unscale(x), 3343 from_account=b_USD, 3344 to_account=a_SAR, 3345 desc=f"{x} USD -> SAR", 3346 debug=debug, 3347 ) 3348 3349 b_USD_balance -= x 3350 cached_value = self.balance(b_USD, cached=True) 3351 fresh_value = self.balance(b_USD, cached=False) 3352 if debug: 3353 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3354 b_USD_balance) 3355 assert cached_value == b_USD_balance 3356 assert fresh_value == b_USD_balance 3357 3358 a_SAR_balance += int(x * b_USD_exchange['rate']) 3359 cached_value = self.balance(a_SAR, cached=True) 3360 fresh_value = self.balance(a_SAR, cached=False) 3361 if debug: 3362 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3363 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3364 assert cached_value == a_SAR_balance 3365 assert fresh_value == a_SAR_balance 3366 i += 1 3367 3368 # Transfer all in many chunks randomly from C to A 3369 c_SAR_balance = 37500 3370 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3371 if debug: 3372 print('amounts', amounts) 3373 i = 0 3374 for x in amounts: 3375 if debug: 3376 print(f'{i} - transfer-with-exchange({x})') 3377 self.transfer( 3378 unscaled_amount=self.unscale(x), 3379 from_account=c_SAR, 3380 to_account=a_SAR, 3381 desc=f"{x} SAR -> a_SAR", 3382 debug=debug, 3383 ) 3384 3385 c_SAR_balance -= x 3386 cached_value = self.balance(c_SAR, cached=True) 3387 fresh_value = self.balance(c_SAR, cached=False) 3388 if debug: 3389 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3390 c_SAR_balance) 3391 assert cached_value == c_SAR_balance 3392 assert fresh_value == c_SAR_balance 3393 3394 a_SAR_balance += x 3395 cached_value = self.balance(a_SAR, cached=True) 3396 fresh_value = self.balance(a_SAR, cached=False) 3397 if debug: 3398 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3399 a_SAR_balance) 3400 assert cached_value == a_SAR_balance 3401 assert fresh_value == a_SAR_balance 3402 i += 1 3403 3404 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3405 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3406 3407 # check & zakat with exchange rates for many cycles 3408 3409 for rate, values in { 3410 1: { 3411 'in': [1000, 2000, 10000], 3412 'exchanged': [100000, 200000, 1000000], 3413 'out': [2500, 5000, 73140], 3414 }, 3415 3.75: { 3416 'in': [200, 1000, 5000], 3417 'exchanged': [75000, 375000, 1875000], 3418 'out': [1875, 9375, 137138], 3419 }, 3420 }.items(): 3421 a, b, c = values['in'] 3422 m, n, o = values['exchanged'] 3423 x, y, z = values['out'] 3424 if debug: 3425 print('rate', rate, 'values', values) 3426 for case in [ 3427 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3428 {'safe': {0: {'below_nisab': x}}}, 3429 ], False, m), 3430 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3431 {'safe': {0: {'count': 1, 'total': y}}}, 3432 ], True, n), 3433 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3434 {'cave': {0: {'count': 3, 'total': z}}}, 3435 ], True, o), 3436 ]: 3437 if debug: 3438 print(f"############# check(rate: {rate}) #############") 3439 print('case', case) 3440 self.reset() 3441 self.exchange(account=case[1], created=case[2], rate=rate) 3442 self.track( 3443 unscaled_value=case[0], 3444 desc='test-check', 3445 account=case[1], 3446 logging=True, 3447 created=case[2], 3448 ) 3449 assert self.snapshot() 3450 3451 # assert self.nolock() 3452 # history_size = len(self._vault['history']) 3453 # print('history_size', history_size) 3454 # assert history_size == 2 3455 assert self.lock() 3456 assert not self.nolock() 3457 report = self.check(2.17, None, debug) 3458 (valid, brief, plan) = report 3459 if debug: 3460 print('brief', brief) 3461 assert valid == case[4] 3462 assert case[5] == brief[0] 3463 assert case[5] == brief[1] 3464 3465 if debug: 3466 pp().pprint(plan) 3467 3468 for x in plan: 3469 assert case[1] == x 3470 if 'total' in case[3][0][x][0].keys(): 3471 assert case[3][0][x][0]['total'] == int(brief[2]) 3472 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3473 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3474 else: 3475 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3476 if debug: 3477 pp().pprint(report) 3478 result = self.zakat(report, debug=debug) 3479 if debug: 3480 print('zakat-result', result, case[4]) 3481 assert result == case[4] 3482 report = self.check(2.17, None, debug) 3483 (valid, brief, plan) = report 3484 assert valid is False 3485 3486 history_size = len(self._vault['history']) 3487 if debug: 3488 print('history_size', history_size) 3489 assert history_size == 3 3490 assert not self.nolock() 3491 assert self.recall(False, debug) is False 3492 self.free(self.lock()) 3493 assert self.nolock() 3494 3495 for i in range(3, 0, -1): 3496 history_size = len(self._vault['history']) 3497 if debug: 3498 print('history_size', history_size) 3499 assert history_size == i 3500 assert self.recall(False, debug) is True 3501 3502 assert self.nolock() 3503 assert self.recall(False, debug) is False 3504 3505 history_size = len(self._vault['history']) 3506 if debug: 3507 print('history_size', history_size) 3508 assert history_size == 0 3509 3510 account_size = len(self._vault['account']) 3511 if debug: 3512 print('account_size', account_size) 3513 assert account_size == 0 3514 3515 report_size = len(self._vault['report']) 3516 if debug: 3517 print('report_size', report_size) 3518 assert report_size == 0 3519 3520 assert self.nolock() 3521 return True 3522 except Exception as e: 3523 # pp().pprint(self._vault) 3524 assert self.export_json("test-snapshot.json") 3525 assert self.save(f"test-snapshot.{self.ext()}") 3526 raise e
3529def test(debug: bool = False): 3530 ledger = ZakatTracker("./zakat_test_db/zakat.camel") 3531 start = ZakatTracker.time() 3532 assert ledger.test(debug=debug) 3533 if debug: 3534 print("#########################") 3535 print("######## TEST DONE ########") 3536 print("#########################") 3537 print(ZakatTracker.duration_from_nanoseconds(ZakatTracker.time() - start)) 3538 print("#########################")
87class Action(Enum): 88 CREATE = auto() 89 TRACK = auto() 90 LOG = auto() 91 SUB = auto() 92 ADD_FILE = auto() 93 REMOVE_FILE = auto() 94 BOX_TRANSFER = auto() 95 EXCHANGE = auto() 96 REPORT = auto() 97 ZAKAT = auto()
100class JSONEncoder(json.JSONEncoder): 101 def default(self, obj): 102 if isinstance(obj, Action) or isinstance(obj, MathOperation): 103 return obj.name # Serialize as the enum member's name 104 elif isinstance(obj, Decimal): 105 return float(obj) 106 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
101 def default(self, obj): 102 if isinstance(obj, Action) or isinstance(obj, MathOperation): 103 return obj.name # Serialize as the enum member's name 104 elif isinstance(obj, Decimal): 105 return float(obj) 106 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
77class WeekDay(Enum): 78 Monday = 0 79 Tuesday = 1 80 Wednesday = 2 81 Thursday = 3 82 Friday = 4 83 Saturday = 5 84 Sunday = 6
55def start_file_server(database_path: str, database_callback: callable = None, csv_callback: callable = None, 56 debug: bool = False) -> tuple: 57 """ 58 Starts a multi-purpose HTTP server to manage file interactions for a Zakat application. 59 60 This server facilitates the following functionalities: 61 62 1. GET /{file_uuid}/get: Download the database file specified by `database_path`. 63 2. GET /{file_uuid}/upload: Display an HTML form for uploading files. 64 3. POST /{file_uuid}/upload: Handle file uploads, distinguishing between: 65 - Database File (.db): Replaces the existing database with the uploaded one. 66 - CSV File (.csv): Imports data from the CSV into the existing database. 67 68 Args: 69 database_path (str): The path to the pickle database file. 70 database_callback (callable, optional): A function to call after a successful database upload. 71 It receives the uploaded database path as its argument. 72 csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, 73 the database path, and the debug flag as its arguments. 74 debug (bool, optional): If True, print debugging information. Defaults to False. 75 76 Returns: 77 Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: 78 - file_name (str): The name of the database file. 79 - download_url (str): The URL to download the database file. 80 - upload_url (str): The URL to access the file upload form. 81 - server_thread (threading.Thread): The thread running the server. 82 - shutdown_server (Callable[[], None]): A function to gracefully shut down the server. 83 84 Example: 85 _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") 86 print(f"Download database: {download_url}") 87 print(f"Upload files: {upload_url}") 88 server_thread.start() 89 # ... later ... 90 shutdown_server() 91 """ 92 file_uuid = uuid.uuid4() 93 file_name = os.path.basename(database_path) 94 95 port = find_available_port() 96 download_url = f"http://localhost:{port}/{file_uuid}/get" 97 upload_url = f"http://localhost:{port}/{file_uuid}/upload" 98 99 class Handler(http.server.SimpleHTTPRequestHandler): 100 def do_GET(self): 101 if self.path == f"/{file_uuid}/get": 102 # GET: Serve the existing file 103 try: 104 with open(database_path, "rb") as f: 105 self.send_response(200) 106 self.send_header("Content-type", "application/octet-stream") 107 self.send_header("Content-Disposition", f'attachment; filename="{file_name}"') 108 self.end_headers() 109 self.wfile.write(f.read()) 110 except FileNotFoundError: 111 self.send_error(404, "File not found") 112 elif self.path == f"/{file_uuid}/upload": 113 # GET: Serve the upload form 114 self.send_response(200) 115 self.send_header("Content-type", "text/html") 116 self.end_headers() 117 self.wfile.write(f""" 118 <html lang="en"> 119 <head> 120 <title>Zakat File Server</title> 121 </head> 122 <body> 123 <h1>Zakat File Server</h1> 124 <h3>You can download the <a target="__blank" href="{download_url}">database file</a>...</h3> 125 <h3>Or upload a new file to restore a database or import `CSV` file:</h3> 126 <form action="/{file_uuid}/upload" method="post" enctype="multipart/form-data"> 127 <input type="file" name="file" required><br/> 128 <input type="radio" id="{FileType.Database.value}" name="upload_type" value="{FileType.Database.value}" required> 129 <label for="database">Database File</label><br/> 130 <input type="radio"id="{FileType.CSV.value}" name="upload_type" value="{FileType.CSV.value}"> 131 <label for="csv">CSV File</label><br/> 132 <input type="submit" value="Upload"><br/> 133 </form> 134 </body></html> 135 """.encode()) 136 else: 137 self.send_error(404) 138 139 def do_POST(self): 140 if self.path == f"/{file_uuid}/upload": 141 # POST: Handle request 142 # 1. Get the Form Data 143 form_data = cgi.FieldStorage( 144 fp=self.rfile, 145 headers=self.headers, 146 environ={'REQUEST_METHOD': 'POST'} 147 ) 148 upload_type = form_data.getvalue("upload_type") 149 150 if debug: 151 print('upload_type', upload_type) 152 153 if upload_type not in [FileType.Database.value, FileType.CSV.value]: 154 self.send_error(400, "Invalid upload type") 155 return 156 157 # 2. Extract File Data 158 file_item = form_data['file'] # Assuming 'file' is your file input name 159 160 # 3. Get File Details 161 filename = file_item.filename 162 file_data = file_item.file.read() # Read the file's content 163 164 if debug: 165 print(f'Uploaded filename: {filename}') 166 167 # 4. Define Storage Path for CSV 168 upload_directory = "./uploads" # Create this directory if it doesn't exist 169 os.makedirs(upload_directory, exist_ok=True) 170 file_path = os.path.join(upload_directory, upload_type) 171 172 # 5. Write to Disk 173 with open(file_path, 'wb') as f: 174 f.write(file_data) 175 176 match upload_type: 177 case FileType.Database.value: 178 179 try: 180 # 6. Verify database file 181 # ZakatTracker(db_path=file_path) # FATAL, Circular Imports Error 182 if database_callback is not None: 183 database_callback(file_path) 184 185 # 7. Copy database into the original path 186 shutil.copy2(file_path, database_path) 187 except Exception as e: 188 self.send_error(400, str(e)) 189 return 190 191 case FileType.CSV.value: 192 # 6. Verify CSV file 193 try: 194 # x = ZakatTracker(db_path=database_path) # FATAL, Circular Imports Error 195 # result = x.import_csv(file_path, debug=debug) 196 if csv_callback is not None: 197 result = csv_callback(file_path, database_path, debug) 198 if debug: 199 print(f'CSV imported: {result}') 200 if len(result[2]) != 0: 201 self.send_response(200) 202 self.end_headers() 203 self.wfile.write(json.dumps(result).encode()) 204 return 205 except Exception as e: 206 self.send_error(400, str(e)) 207 return 208 209 self.send_response(200) 210 self.end_headers() 211 self.wfile.write(b"File uploaded successfully.") 212 213 httpd = socketserver.TCPServer(("localhost", port), Handler) 214 server_thread = threading.Thread(target=httpd.serve_forever) 215 216 def shutdown_server(): 217 nonlocal httpd, server_thread 218 httpd.shutdown() 219 httpd.server_close() # Close the socket 220 server_thread.join() # Wait for the thread to finish 221 222 return file_name, download_url, upload_url, server_thread, shutdown_server
Starts a multi-purpose HTTP server to manage file interactions for a Zakat application.
This server facilitates the following functionalities:
- GET /{file_uuid}/get: Download the database file specified by
database_path
. - GET /{file_uuid}/upload: Display an HTML form for uploading files.
- POST /{file_uuid}/upload: Handle file uploads, distinguishing between:
- Database File (.db): Replaces the existing database with the uploaded one.
- CSV File (.csv): Imports data from the CSV into the existing database.
Args: database_path (str): The path to the pickle database file. database_callback (callable, optional): A function to call after a successful database upload. It receives the uploaded database path as its argument. csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, the database path, and the debug flag as its arguments. debug (bool, optional): If True, print debugging information. Defaults to False.
Returns: Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: - file_name (str): The name of the database file. - download_url (str): The URL to download the database file. - upload_url (str): The URL to access the file upload form. - server_thread (threading.Thread): The thread running the server. - shutdown_server (Callable[[], None]): A function to gracefully shut down the server.
Example: _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") print(f"Download database: {download_url}") print(f"Upload files: {upload_url}") server_thread.start() # ... later ... shutdown_server()
32def find_available_port() -> int: 33 """ 34 Finds and returns an available TCP port on the local machine. 35 36 This function utilizes a TCP server socket to bind to port 0, which 37 instructs the operating system to automatically assign an available 38 port. The assigned port is then extracted and returned. 39 40 Returns: 41 int: The available TCP port number. 42 43 Raises: 44 OSError: If an error occurs during the port binding process, such 45 as all ports being in use. 46 47 Example: 48 port = find_available_port() 49 print(f"Available port: {port}") 50 """ 51 with socketserver.TCPServer(("localhost", 0), None) as s: 52 return s.server_address[1]
Finds and returns an available TCP port on the local machine.
This function utilizes a TCP server socket to bind to port 0, which instructs the operating system to automatically assign an available port. The assigned port is then extracted and returned.
Returns: int: The available TCP port number.
Raises: OSError: If an error occurs during the port binding process, such as all ports being in use.
Example: port = find_available_port() print(f"Available port: {port}")