zakat
xxx
_____ _ _ _ _ _
|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _
/ // _| |/ / _
| __| | | | | '_ \| '__/ _` | '__| | | |
/ /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| |
/______,_|_|___,_|__| |_____|_|_.__/|_| __,_|_| __, |
|___/
"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف ... Never Trust, Always Verify ...
This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat.
1""" 2 _____ _ _ _ _ _ 3|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _ 4 / // _` | |/ / _` | __| | | | | '_ \| '__/ _` | '__| | | | 5 / /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| | 6/____\__,_|_|\_\__,_|\__| |_____|_|_.__/|_| \__,_|_| \__, | 7 |___/ 8 9"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف 10... Never Trust, Always Verify ... 11 12This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat. 13""" 14# Importing necessary classes and functions from the main module 15from zakat.zakat_tracker import ( 16 ZakatTracker, 17 test, 18 Action, 19 JSONEncoder, 20 MathOperation, 21) 22 23from zakat.file_server import ( 24 start_file_server, 25 find_available_port, 26 FileType, 27) 28 29# Version information for the module 30__version__ = ZakatTracker.Version() 31__all__ = [ 32 "ZakatTracker", 33 "test", 34 "Action", 35 "JSONEncoder", 36 "MathOperation", 37 "start_file_server", 38 "find_available_port", 39 "FileType", 40]
123class ZakatTracker: 124 """ 125 A class for tracking and calculating Zakat. 126 127 This class provides functionalities for recording transactions, calculating Zakat due, 128 and managing account balances. It also offers features like importing transactions from 129 CSV files, exporting data to JSON format, and saving/loading the tracker state. 130 131 The `ZakatTracker` class is designed to handle both positive and negative transactions, 132 allowing for flexible tracking of financial activities related to Zakat. It also supports 133 the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due 134 based on the current silver price. 135 136 The class uses a camel file as its database to persist the tracker state, 137 ensuring data integrity across sessions. It also provides options for enabling or 138 disabling history tracking, allowing users to choose their preferred level of detail. 139 140 In addition, the `ZakatTracker` class includes various helper methods like 141 `time`, `time_to_datetime`, `lock`, `free`, `recall`, `export_json`, 142 and more. These methods provide additional functionalities and flexibility 143 for interacting with and managing the Zakat tracker. 144 145 Attributes: 146 ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. 147 ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. 148 ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. 149 ZakatTracker.Version (function): The version of the ZakatTracker class. 150 151 Data Structure: 152 The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data. 153 154 _vault (dict): 155 - account (dict): 156 - {account_number} (dict): 157 - balance (int): The current balance of the account. 158 - box (dict): A dictionary storing transaction details. 159 - {timestamp} (dict): 160 - capital (int): The initial amount of the transaction. 161 - count (int): The number of times Zakat has been calculated for this transaction. 162 - last (int): The timestamp of the last Zakat calculation. 163 - rest (int): The remaining amount after Zakat deductions and withdrawal. 164 - total (int): The total Zakat deducted from this transaction. 165 - count (int): The total number of transactions for the account. 166 - log (dict): A dictionary storing transaction logs. 167 - {timestamp} (dict): 168 - value (int): The transaction amount (positive or negative). 169 - desc (str): The description of the transaction. 170 - ref (int): The box reference (positive or None). 171 - file (dict): A dictionary storing file references associated with the transaction. 172 - hide (bool): Indicates whether the account is hidden or not. 173 - zakatable (bool): Indicates whether the account is subject to Zakat. 174 - exchange (dict): 175 - account (dict): 176 - {timestamps} (dict): 177 - rate (float): Exchange rate when compared to local currency. 178 - description (str): The description of the exchange rate. 179 - history (dict): 180 - {timestamp} (list): A list of dictionaries storing the history of actions performed. 181 - {action_dict} (dict): 182 - action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT). 183 - account (str): The account number associated with the action. 184 - ref (int): The reference number of the transaction. 185 - file (int): The reference number of the file (if applicable). 186 - key (str): The key associated with the action (e.g., 'rest', 'total'). 187 - value (int): The value associated with the action. 188 - math (MathOperation): The mathematical operation performed (if applicable). 189 - lock (int or None): The timestamp indicating the current lock status (None if not locked). 190 - report (dict): 191 - {timestamp} (tuple): A tuple storing Zakat report details. 192 193 """ 194 195 @staticmethod 196 def Version() -> str: 197 """ 198 Returns the current version of the software. 199 200 This function returns a string representing the current version of the software, 201 including major, minor, and patch version numbers in the format "X.Y.Z". 202 203 Returns: 204 str: The current version of the software. 205 """ 206 return '0.2.83' 207 208 @staticmethod 209 def ZakatCut(x: float) -> float: 210 """ 211 Calculates the Zakat amount due on an asset. 212 213 This function calculates the zakat amount due on a given asset value over one lunar year. 214 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 215 that exceeds a certain threshold (Nisab). 216 217 Parameters: 218 x: The total value of the asset on which Zakat is to be calculated. 219 220 Returns: 221 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 222 """ 223 return 0.025 * x # Zakat Cut in one Lunar Year 224 225 @staticmethod 226 def TimeCycle(days: int = 355) -> int: 227 """ 228 Calculates the approximate duration of a lunar year in nanoseconds. 229 230 This function calculates the approximate duration of a lunar year based on the given number of days. 231 It converts the given number of days into nanoseconds for use in high-precision timing applications. 232 233 Parameters: 234 days: The number of days in a lunar year. Defaults to 355, 235 which is an approximation of the average length of a lunar year. 236 237 Returns: 238 The approximate duration of a lunar year in nanoseconds. 239 """ 240 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds 241 242 @staticmethod 243 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 244 """ 245 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 246 247 This function calculates the Nisab value, which is the minimum threshold of wealth, 248 that makes an individual liable for paying Zakat. 249 The Nisab value is determined by the equivalent value of a specific amount 250 of gold or silver (currently 595 grams in silver) in the local currency. 251 252 Parameters: 253 - gram_price (float): The price per gram of Nisab. 254 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 255 256 Returns: 257 - float: The total value of Nisab based on the given price per gram. 258 """ 259 return gram_price * gram_quantity 260 261 @staticmethod 262 def ext() -> str: 263 """ 264 Returns the file extension used by the ZakatTracker class. 265 266 Returns: 267 str: The file extension used by the ZakatTracker class, which is 'camel'. 268 """ 269 return 'camel' 270 271 def __init__(self, db_path: str = "zakat.camel", history_mode: bool = True): 272 """ 273 Initialize ZakatTracker with database path and history mode. 274 275 Parameters: 276 db_path (str): The path to the database file. Default is "zakat.camel". 277 history_mode (bool): The mode for tracking history. Default is True. 278 279 Returns: 280 None 281 """ 282 self._base_path = None 283 self._vault_path = None 284 self._vault = None 285 self.reset() 286 self._history(history_mode) 287 self.path(db_path) 288 289 def path(self, path: str = None) -> str: 290 """ 291 Set or get the path to the database file. 292 293 If no path is provided, the current path is returned. 294 If a path is provided, it is set as the new path. 295 The function also creates the necessary directories if the provided path is a file. 296 297 Parameters: 298 path (str): The new path to the database file. If not provided, the current path is returned. 299 300 Returns: 301 str: The current or new path to the database file. 302 """ 303 if path is None: 304 return self._vault_path 305 self._vault_path = Path(path).resolve() 306 base_path = Path(path).resolve() 307 if base_path.is_file() or base_path.suffix: 308 base_path = base_path.parent 309 base_path.mkdir(parents=True, exist_ok=True) 310 self._base_path = base_path 311 return str(self._vault_path) 312 313 def base_path(self, *args) -> str: 314 """ 315 Generate a base path by joining the provided arguments with the existing base path. 316 317 Parameters: 318 *args (str): Variable length argument list of strings to be joined with the base path. 319 320 Returns: 321 str: The generated base path. If no arguments are provided, the existing base path is returned. 322 """ 323 if not args: 324 return str(self._base_path) 325 filtered_args = [] 326 ignored_filename = None 327 for arg in args: 328 if Path(arg).suffix: 329 ignored_filename = arg 330 else: 331 filtered_args.append(arg) 332 base_path = Path(self._base_path) 333 full_path = base_path.joinpath(*filtered_args) 334 full_path.mkdir(parents=True, exist_ok=True) 335 if ignored_filename is not None: 336 return full_path.resolve() / ignored_filename # Join with the ignored filename 337 return str(full_path.resolve()) 338 339 @staticmethod 340 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 341 """ 342 Scales a numerical value by a specified power of 10, returning an integer. 343 344 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 345 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 346 347 Parameters: 348 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 349 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 350 by a factor of 100 (e.g., converts 1.23 to 123). 351 352 Returns: 353 The scaled value, rounded to the nearest integer. 354 355 Raises: 356 TypeError: If the input `x` is not a valid numeric type. 357 358 Examples: 359 >>> ZakatTracker.scale(3.14159) 360 314 361 >>> ZakatTracker.scale(1234, decimal_places=3) 362 1234000 363 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 364 50 365 """ 366 if not isinstance(x, (float, int, Decimal)): 367 raise TypeError("Input 'x' must be a float, int, or Decimal.") 368 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places)) 369 370 @staticmethod 371 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 372 """ 373 Unscales an integer by a power of 10. 374 375 Parameters: 376 x: The integer to unscale. 377 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 378 decimal_places: The power of 10 to use. Defaults to 2. 379 380 Returns: 381 The unscaled number, converted to the specified return_type. 382 383 Raises: 384 TypeError: If the return_type is not float or Decimal. 385 """ 386 if return_type not in (float, Decimal): 387 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 388 return round(return_type(x / (10 ** decimal_places)), decimal_places) 389 390 def _history(self, status: bool = None) -> bool: 391 """ 392 Enable or disable history tracking. 393 394 Parameters: 395 status (bool): The status of history tracking. Default is True. 396 397 Returns: 398 None 399 """ 400 if status is not None: 401 self._history_mode = status 402 return self._history_mode 403 404 def reset(self) -> None: 405 """ 406 Reset the internal data structure to its initial state. 407 408 Parameters: 409 None 410 411 Returns: 412 None 413 """ 414 self._vault = { 415 'account': {}, 416 'exchange': {}, 417 'history': {}, 418 'lock': None, 419 'report': {}, 420 } 421 422 @staticmethod 423 def time(now: datetime = None) -> int: 424 """ 425 Generates a timestamp based on the provided datetime object or the current datetime. 426 427 Parameters: 428 now (datetime, optional): The datetime object to generate the timestamp from. 429 If not provided, the current datetime is used. 430 431 Returns: 432 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 433 before 1970 will return in negative until 1000AD. 434 """ 435 if now is None: 436 now = datetime.datetime.now() 437 ordinal_day = now.toordinal() 438 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 439 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day) 440 441 @staticmethod 442 def time_to_datetime(ordinal_ns: int) -> datetime: 443 """ 444 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 445 446 Parameters: 447 ordinal_ns (int): The ordinal number of days since 1000-01-01. 448 449 Returns: 450 datetime: The corresponding datetime object. 451 """ 452 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 453 ns_in_day = ordinal_ns % 86_400_000_000_000 454 d = datetime.datetime.fromordinal(ordinal_day) 455 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 456 return datetime.datetime.combine(d, datetime.time()) + t 457 458 def clean_history(self, lock: int | None = None) -> int: 459 """ 460 Cleans up the history of actions performed on the ZakatTracker instance. 461 462 Parameters: 463 lock (int, optional): The lock ID is used to clean up the empty history. 464 If not provided, it cleans up the empty history records for all locks. 465 466 Returns: 467 int: The number of locks cleaned up. 468 """ 469 count = 0 470 if lock in self._vault['history']: 471 if len(self._vault['history'][lock]) <= 0: 472 count += 1 473 del self._vault['history'][lock] 474 return count 475 self.free(self.lock()) 476 for lock in self._vault['history']: 477 if len(self._vault['history'][lock]) <= 0: 478 count += 1 479 del self._vault['history'][lock] 480 return count 481 482 def _step(self, action: Action = None, account=None, ref: int = None, file: int = None, value: float = None, 483 key: str = None, math_operation: MathOperation = None) -> int: 484 """ 485 This method is responsible for recording the actions performed on the ZakatTracker. 486 487 Parameters: 488 - action (Action): The type of action performed. 489 - account (str): The account number on which the action was performed. 490 - ref (int): The reference number of the action. 491 - file (int): The file reference number of the action. 492 - value (int): The value associated with the action. 493 - key (str): The key associated with the action. 494 - math_operation (MathOperation): The mathematical operation performed during the action. 495 496 Returns: 497 - int: The lock time of the recorded action. If no lock was performed, it returns 0. 498 """ 499 if not self._history(): 500 return 0 501 lock = self._vault['lock'] 502 if self.nolock(): 503 lock = self._vault['lock'] = self.time() 504 self._vault['history'][lock] = [] 505 if action is None: 506 return lock 507 self._vault['history'][lock].append({ 508 'action': action, 509 'account': account, 510 'ref': ref, 511 'file': file, 512 'key': key, 513 'value': value, 514 'math': math_operation, 515 }) 516 return lock 517 518 def nolock(self) -> bool: 519 """ 520 Check if the vault lock is currently not set. 521 522 Returns: 523 bool: True if the vault lock is not set, False otherwise. 524 """ 525 return self._vault['lock'] is None 526 527 def lock(self) -> int: 528 """ 529 Acquires a lock on the ZakatTracker instance. 530 531 Returns: 532 int: The lock ID. This ID can be used to release the lock later. 533 """ 534 return self._step() 535 536 def vault(self) -> dict: 537 """ 538 Returns a copy of the internal vault dictionary. 539 540 This method is used to retrieve the current state of the ZakatTracker object. 541 It provides a snapshot of the internal data structure, allowing for further 542 processing or analysis. 543 544 Returns: 545 dict: A copy of the internal vault dictionary. 546 """ 547 return self._vault.copy() 548 549 def stats(self) -> dict[str, tuple]: 550 """ 551 Calculates and returns statistics about the object's data storage. 552 553 This method determines the size of the database file on disk and the 554 size of the data currently held in RAM (likely within a dictionary). 555 Both sizes are reported in bytes and in a human-readable format 556 (e.g., KB, MB). 557 558 Returns: 559 dict[str, tuple]: A dictionary containing the following statistics: 560 561 * 'database': A tuple with two elements: 562 - The database file size in bytes (int). 563 - The database file size in human-readable format (str). 564 * 'ram': A tuple with two elements: 565 - The RAM usage (dictionary size) in bytes (int). 566 - The RAM usage in human-readable format (str). 567 568 Example: 569 >>> stats = my_object.stats() 570 >>> print(stats['database']) 571 (256000, '250.0 KB') 572 >>> print(stats['ram']) 573 (12345, '12.1 KB') 574 """ 575 ram_size = self.get_dict_size(self.vault()) 576 file_size = os.path.getsize(self.path()) 577 return { 578 'database': (file_size, self.human_readable_size(file_size)), 579 'ram': (ram_size, self.human_readable_size(ram_size)), 580 } 581 582 def files(self) -> list[dict[str, str | int]]: 583 """ 584 Retrieves information about files associated with this class. 585 586 This class method provides a standardized way to gather details about 587 files used by the class for storage, snapshots, and CSV imports. 588 589 Returns: 590 list[dict[str, str | int]]: A list of dictionaries, each containing information 591 about a specific file: 592 593 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 594 * path (str): The full file path. 595 * exists (bool): Whether the file exists on the filesystem. 596 * size (int): The file size in bytes (0 if the file doesn't exist). 597 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 598 599 Example: 600 ``` 601 file_info = MyClass.files() 602 for info in file_info: 603 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 604 ``` 605 """ 606 result = [] 607 for file_type, path in { 608 'database': self.path(), 609 'snapshot': self.snapshot_cache_path(), 610 'import_csv': self.import_csv_cache_path(), 611 }.items(): 612 exists = os.path.exists(path) 613 size = os.path.getsize(path) if exists else 0 614 human_readable_size = self.human_readable_size(size) if exists else 0 615 result.append({ 616 'type': file_type, 617 'path': path, 618 'exists': exists, 619 'size': size, 620 'human_readable_size': human_readable_size, 621 }) 622 return result 623 624 def steps(self) -> dict: 625 """ 626 Returns a copy of the history of steps taken in the ZakatTracker. 627 628 The history is a dictionary where each key is a unique identifier for a step, 629 and the corresponding value is a dictionary containing information about the step. 630 631 Returns: 632 dict: A copy of the history of steps taken in the ZakatTracker. 633 """ 634 return self._vault['history'].copy() 635 636 def free(self, lock: int, auto_save: bool = True) -> bool: 637 """ 638 Releases the lock on the database. 639 640 Parameters: 641 lock (int): The lock ID to be released. 642 auto_save (bool): Whether to automatically save the database after releasing the lock. 643 644 Returns: 645 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 646 """ 647 if lock == self._vault['lock']: 648 self._vault['lock'] = None 649 self.clean_history(lock) 650 if auto_save: 651 return self.save(self.path()) 652 return True 653 return False 654 655 def account_exists(self, account) -> bool: 656 """ 657 Check if the given account exists in the vault. 658 659 Parameters: 660 account (str): The account number to check. 661 662 Returns: 663 bool: True if the account exists, False otherwise. 664 """ 665 return account in self._vault['account'] 666 667 def box_size(self, account) -> int: 668 """ 669 Calculate the size of the box for a specific account. 670 671 Parameters: 672 account (str): The account number for which the box size needs to be calculated. 673 674 Returns: 675 int: The size of the box for the given account. If the account does not exist, -1 is returned. 676 """ 677 if self.account_exists(account): 678 return len(self._vault['account'][account]['box']) 679 return -1 680 681 def log_size(self, account) -> int: 682 """ 683 Get the size of the log for a specific account. 684 685 Parameters: 686 account (str): The account number for which the log size needs to be calculated. 687 688 Returns: 689 int: The size of the log for the given account. If the account does not exist, -1 is returned. 690 """ 691 if self.account_exists(account): 692 return len(self._vault['account'][account]['log']) 693 return -1 694 695 @staticmethod 696 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 697 """ 698 Calculates the hash of a file using the specified algorithm. 699 700 Parameters: 701 file_path (str): The path to the file. 702 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 703 704 Returns: 705 str: The hexadecimal representation of the file's hash. 706 """ 707 hash_obj = hashlib.new(algorithm) # Create the hash object 708 with open(file_path, "rb") as f: # Open file in binary mode for reading 709 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 710 hash_obj.update(chunk) 711 return hash_obj.hexdigest() # Return the hash as a hexadecimal string 712 713 def snapshot_cache_path(self): 714 """ 715 Generate the path for the cache file used to store snapshots. 716 717 The cache file is a camel file that stores the timestamps of the snapshots. 718 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 719 720 Returns: 721 str: The path to the cache file. 722 """ 723 path = str(self.path()) 724 ext = self.ext() 725 ext_len = len(ext) 726 if path.endswith(f'.{ext}'): 727 path = path[:-ext_len-1] 728 return path + f'.snapshots.{ext}' 729 730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True 761 762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result 793 794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True 941 942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False 957 958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref) 970 971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created 1036 1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref) 1049 1050 def _log(self, value: float, desc: str = '', account: str = 1, created: int = None, ref: int = None, 1051 debug: bool = False) -> int: 1052 """ 1053 Log a transaction into the account's log. 1054 1055 Parameters: 1056 value (float): The value of the transaction. 1057 desc (str): The description of the transaction. 1058 account (str): The account to log the transaction into. Default is '1'. 1059 created (int): The timestamp of the transaction. If not provided, it will be generated. 1060 1061 Returns: 1062 int: The timestamp of the logged transaction. 1063 1064 This method updates the account's balance, count, and log with the transaction details. 1065 It also creates a step in the history of the transaction. 1066 1067 Raises: 1068 ValueError: The log transaction happened again in the same nanosecond time. 1069 """ 1070 if debug: 1071 print('_log', f'debug={debug}') 1072 if created is None: 1073 created = self.time() 1074 try: 1075 self._vault['account'][account]['balance'] += value 1076 except TypeError: 1077 self._vault['account'][account]['balance'] += Decimal(value) 1078 self._vault['account'][account]['count'] += 1 1079 if debug: 1080 print('create-log', created) 1081 if self.log_exists(account, created): 1082 raise ValueError(f"The log transaction happened again in the same nanosecond time({created}).") 1083 if debug: 1084 print('created-log', created) 1085 self._vault['account'][account]['log'][created] = { 1086 'value': value, 1087 'desc': desc, 1088 'ref': ref, 1089 'file': {}, 1090 } 1091 self._step(Action.LOG, account, ref=created, value=value) 1092 return created 1093 1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ 1144 1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate 1159 1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy() 1173 1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result 1188 1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {} 1203 1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {} 1218 1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0 1242 1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False 1267 1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1] 1287 1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False 1320 1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False 1353 1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages 1433 1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times 1508 1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan 1621 1622 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 demand (float): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': float, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': demand, 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts 1656 1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0 1707 1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1779 if no_lock: 1780 self.free(self.lock()) 1781 return True 1782 1783 def export_json(self, path: str = "data.json") -> bool: 1784 """ 1785 Exports the current state of the ZakatTracker object to a JSON file. 1786 1787 Parameters: 1788 path (str): The path where the JSON file will be saved. Default is "data.json". 1789 1790 Returns: 1791 bool: True if the export is successful, False otherwise. 1792 1793 Raises: 1794 No specific exceptions are raised by this method. 1795 """ 1796 with open(path, "w") as file: 1797 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1798 return True 1799 1800 def save(self, path: str = None) -> bool: 1801 """ 1802 Saves the ZakatTracker's current state to a camel file. 1803 1804 This method serializes the internal data (`_vault`). 1805 1806 Parameters: 1807 path (str, optional): File path for saving. Defaults to a predefined location. 1808 1809 Returns: 1810 bool: True if the save operation is successful, False otherwise. 1811 """ 1812 if path is None: 1813 path = self.path() 1814 with open(f'{path}.tmp', 'w') as stream: 1815 # first save in tmp file 1816 stream.write(camel.dump(self._vault)) 1817 # then move tmp file to original location 1818 shutil.move(f'{path}.tmp', path) 1819 return True 1820 1821 def load(self, path: str = None) -> bool: 1822 """ 1823 Load the current state of the ZakatTracker object from a camel file. 1824 1825 Parameters: 1826 path (str): The path where the camel file is located. If not provided, it will use the default path. 1827 1828 Returns: 1829 bool: True if the load operation is successful, False otherwise. 1830 """ 1831 if path is None: 1832 path = self.path() 1833 if os.path.exists(path): 1834 with open(path, 'r') as stream: 1835 self._vault = camel.load(stream.read()) 1836 return True 1837 return False 1838 1839 def import_csv_cache_path(self): 1840 """ 1841 Generates the cache file path for imported CSV data. 1842 1843 This function constructs the file path where cached data from CSV imports 1844 will be stored. The cache file is a camel file (.camel extension) appended 1845 to the base path of the object. 1846 1847 Returns: 1848 str: The full path to the import CSV cache file. 1849 1850 Example: 1851 >>> obj = ZakatTracker('/data/reports') 1852 >>> obj.import_csv_cache_path() 1853 '/data/reports.import_csv.camel' 1854 """ 1855 path = str(self.path()) 1856 ext = self.ext() 1857 ext_len = len(ext) 1858 if path.endswith(f'.{ext}'): 1859 path = path[:-ext_len-1] 1860 return path + f'.import_csv.{ext}' 1861 1862 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1863 """ 1864 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1865 1866 Parameters: 1867 path (str): The path to the CSV file. Default is 'file.csv'. 1868 debug (bool): A flag indicating whether to print debug information. 1869 1870 Returns: 1871 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1872 and a dictionary of bad transactions. 1873 1874 Notes: 1875 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1876 are appropriate for the currency pairs involved in the conversions. 1877 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1878 to 1.0 or the previous rate for that account. 1879 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1880 transactions of the same account within the whole imported and existing dataset when doing `check` and 1881 `zakat` operations. 1882 1883 Example Usage: 1884 The CSV file should have the following format, rate is optional per transaction: 1885 account, desc, value, date, rate 1886 For example: 1887 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1888 """ 1889 if debug: 1890 print('import_csv', f'debug={debug}') 1891 cache: list[int] = [] 1892 try: 1893 with open(self.import_csv_cache_path(), 'r') as stream: 1894 cache = camel.load(stream.read()) 1895 except: 1896 pass 1897 date_formats = [ 1898 "%Y-%m-%d %H:%M:%S", 1899 "%Y-%m-%dT%H:%M:%S", 1900 "%Y-%m-%dT%H%M%S", 1901 "%Y-%m-%d", 1902 ] 1903 created, found, bad = 0, 0, {} 1904 data: dict[int, list] = {} 1905 with open(path, newline='', encoding="utf-8") as f: 1906 i = 0 1907 for row in csv.reader(f, delimiter=','): 1908 i += 1 1909 hashed = hash(tuple(row)) 1910 if hashed in cache: 1911 found += 1 1912 continue 1913 account = row[0] 1914 desc = row[1] 1915 value = float(row[2]) 1916 rate = 1.0 1917 if row[4:5]: # Empty list if index is out of range 1918 rate = float(row[4]) 1919 date: int = 0 1920 for time_format in date_formats: 1921 try: 1922 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1923 break 1924 except: 1925 pass 1926 # TODO: not allowed for negative dates in the future after enhance time functions 1927 if date == 0 or value == 0: 1928 bad[i] = row + ('invalid date',) 1929 continue 1930 if date not in data: 1931 data[date] = [] 1932 data[date].append((i, account, desc, value, date, rate, hashed)) 1933 1934 if debug: 1935 print('import_csv', len(data)) 1936 1937 if bad: 1938 return created, found, bad 1939 1940 for date, rows in sorted(data.items()): 1941 try: 1942 len_rows = len(rows) 1943 if len_rows == 1: 1944 (_, account, desc, value, date, rate, hashed) = rows[0] 1945 if rate > 0: 1946 self.exchange(account, created=date, rate=rate) 1947 if value > 0: 1948 self.track(value, desc, account, True, date) 1949 elif value < 0: 1950 self.sub(-value, desc, account, date) 1951 created += 1 1952 cache.append(hashed) 1953 continue 1954 if debug: 1955 print('-- Duplicated time detected', date, 'len', len_rows) 1956 print(rows) 1957 print('---------------------------------') 1958 # If records are found at the same time with different accounts in the same amount 1959 # (one positive and the other negative), this indicates it is a transfer. 1960 if len_rows != 2: 1961 raise Exception(f'more than two transactions({len_rows}) at the same time') 1962 (i, account1, desc1, value1, date1, rate1, _) = rows[0] 1963 (j, account2, desc2, value2, date2, rate2, _) = rows[1] 1964 if account1 == account2 or desc1 != desc2 or abs(value1) != abs(value2) or date1 != date2: 1965 raise Exception('invalid transfer') 1966 if rate1 > 0: 1967 self.exchange(account1, created=date1, rate=rate1) 1968 if rate2 > 0: 1969 self.exchange(account2, created=date2, rate=rate2) 1970 values = { 1971 value1: account1, 1972 value2: account2, 1973 } 1974 self.transfer( 1975 unscaled_amount=abs(value1), 1976 from_account=values[min(values.keys())], 1977 to_account=values[max(values.keys())], 1978 desc=desc1, 1979 created=date1, 1980 ) 1981 except Exception as e: 1982 for (i, account, desc, value, date, rate, _) in rows: 1983 bad[i] = (account, desc, value, date, rate, e) 1984 break 1985 with open(self.import_csv_cache_path(), 'w') as stream: 1986 stream.write(camel.dump(cache)) 1987 return created, found, bad 1988 1989 ######## 1990 # TESTS # 1991 ####### 1992 1993 @staticmethod 1994 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1995 """ 1996 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 1997 1998 This function iterates through progressively larger units of information 1999 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2000 range that can be expressed with a reasonable number before the unit. 2001 2002 Parameters: 2003 size (float): The size in bytes to convert. 2004 decimal_places (int, optional): The number of decimal places to display 2005 in the result. Defaults to 2. 2006 2007 Returns: 2008 str: A string representation of the size in a human-readable format, 2009 rounded to the specified number of decimal places. For example: 2010 - "1.50 KB" (1536 bytes) 2011 - "23.00 MB" (24117248 bytes) 2012 - "1.23 GB" (1325899906 bytes) 2013 """ 2014 if type(size) not in (float, int): 2015 raise TypeError("size must be a float or integer") 2016 if type(decimal_places) != int: 2017 raise TypeError("decimal_places must be an integer") 2018 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2019 if size < 1024.0: 2020 break 2021 size /= 1024.0 2022 return f"{size:.{decimal_places}f} {unit}" 2023 2024 @staticmethod 2025 def get_dict_size(obj: dict, seen: set = None) -> float: 2026 """ 2027 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2028 2029 This function traverses the dictionary structure, accounting for the size of keys, values, 2030 and any nested objects. It handles various data types commonly found in dictionaries 2031 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2032 of circular references. 2033 2034 Parameters: 2035 obj (dict): The dictionary whose size is to be calculated. 2036 seen (set, optional): A set used internally to track visited objects 2037 and avoid circular references. Defaults to None. 2038 2039 Returns: 2040 float: An approximate size of the dictionary and its contents in bytes. 2041 2042 Note: 2043 - This function is a method of the `ZakatTracker` class and is likely used to 2044 estimate the memory footprint of data structures relevant to Zakat calculations. 2045 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2046 not account for all memory overhead depending on the Python implementation. 2047 - Circular references are handled to prevent infinite recursion. 2048 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2049 - String sizes are estimated based on character length and encoding. 2050 """ 2051 size = 0 2052 if seen is None: 2053 seen = set() 2054 2055 obj_id = id(obj) 2056 if obj_id in seen: 2057 return 0 2058 2059 seen.add(obj_id) 2060 size += sys.getsizeof(obj) 2061 2062 if isinstance(obj, dict): 2063 for k, v in obj.items(): 2064 size += ZakatTracker.get_dict_size(k, seen) 2065 size += ZakatTracker.get_dict_size(v, seen) 2066 elif isinstance(obj, (list, tuple, set, frozenset)): 2067 for item in obj: 2068 size += ZakatTracker.get_dict_size(item, seen) 2069 elif isinstance(obj, (int, float, complex)): # Handle numbers 2070 pass # Basic numbers have a fixed size, so nothing to add here 2071 elif isinstance(obj, str): # Handle strings 2072 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2073 return size 2074 2075 @staticmethod 2076 def duration_from_nanoseconds(ns: int, 2077 show_zeros_in_spoken_time: bool = False, 2078 spoken_time_separator=',', 2079 millennia: str = 'Millennia', 2080 century: str = 'Century', 2081 years: str = 'Years', 2082 days: str = 'Days', 2083 hours: str = 'Hours', 2084 minutes: str = 'Minutes', 2085 seconds: str = 'Seconds', 2086 milli_seconds: str = 'MilliSeconds', 2087 micro_seconds: str = 'MicroSeconds', 2088 nano_seconds: str = 'NanoSeconds', 2089 ) -> tuple: 2090 """ 2091 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2092 Convert NanoSeconds to Human Readable Time Format. 2093 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2094 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2095 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2096 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2097 2098 INPUT : ms (AKA: MilliSeconds) 2099 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2100 OUTPUT Variables: time_lapsed, spoken_time 2101 2102 Example Input: duration_from_nanoseconds(ns) 2103 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2104 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2105 duration_from_nanoseconds(1234567890123456789012) 2106 """ 2107 us, ns = divmod(ns, 1000) 2108 ms, us = divmod(us, 1000) 2109 s, ms = divmod(ms, 1000) 2110 m, s = divmod(s, 60) 2111 h, m = divmod(m, 60) 2112 d, h = divmod(h, 24) 2113 y, d = divmod(d, 365) 2114 c, y = divmod(y, 100) 2115 n, c = divmod(c, 10) 2116 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2117 spoken_time_part = [] 2118 if n > 0 or show_zeros_in_spoken_time: 2119 spoken_time_part.append(f"{n: 3d} {millennia}") 2120 if c > 0 or show_zeros_in_spoken_time: 2121 spoken_time_part.append(f"{c: 4d} {century}") 2122 if y > 0 or show_zeros_in_spoken_time: 2123 spoken_time_part.append(f"{y: 3d} {years}") 2124 if d > 0 or show_zeros_in_spoken_time: 2125 spoken_time_part.append(f"{d: 4d} {days}") 2126 if h > 0 or show_zeros_in_spoken_time: 2127 spoken_time_part.append(f"{h: 2d} {hours}") 2128 if m > 0 or show_zeros_in_spoken_time: 2129 spoken_time_part.append(f"{m: 2d} {minutes}") 2130 if s > 0 or show_zeros_in_spoken_time: 2131 spoken_time_part.append(f"{s: 2d} {seconds}") 2132 if ms > 0 or show_zeros_in_spoken_time: 2133 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2134 if us > 0 or show_zeros_in_spoken_time: 2135 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2136 if ns > 0 or show_zeros_in_spoken_time: 2137 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2138 return time_lapsed, spoken_time_separator.join(spoken_time_part) 2139 2140 @staticmethod 2141 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2142 """ 2143 Convert a specific day, month, and year into a timestamp. 2144 2145 Parameters: 2146 day (int): The day of the month. 2147 month (int): The month of the year. Default is 6 (June). 2148 year (int): The year. Default is 2024. 2149 2150 Returns: 2151 int: The timestamp representing the given day, month, and year. 2152 2153 Note: 2154 This method assumes the default month and year if not provided. 2155 """ 2156 return ZakatTracker.time(datetime.datetime(year, month, day)) 2157 2158 @staticmethod 2159 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2160 """ 2161 Generate a random date between two given dates. 2162 2163 Parameters: 2164 start_date (datetime.datetime): The start date from which to generate a random date. 2165 end_date (datetime.datetime): The end date until which to generate a random date. 2166 2167 Returns: 2168 datetime.datetime: A random date between the start_date and end_date. 2169 """ 2170 time_between_dates = end_date - start_date 2171 days_between_dates = time_between_dates.days 2172 random_number_of_days = random.randrange(days_between_dates) 2173 return start_date + datetime.timedelta(days=random_number_of_days) 2174 2175 @staticmethod 2176 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2177 debug: bool = False) -> int: 2178 """ 2179 Generate a random CSV file with specified parameters. 2180 2181 Parameters: 2182 path (str): The path where the CSV file will be saved. Default is "data.csv". 2183 count (int): The number of rows to generate in the CSV file. Default is 1000. 2184 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2185 debug (bool): A flag indicating whether to print debug information. 2186 2187 Returns: 2188 None. The function generates a CSV file at the specified path with the given count of rows. 2189 Each row contains a randomly generated account, description, value, and date. 2190 The value is randomly generated between 1000 and 100000, 2191 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2192 If the row number is not divisible by 13, the value is multiplied by -1. 2193 """ 2194 if debug: 2195 print('generate_random_csv_file', f'debug={debug}') 2196 i = 0 2197 with open(path, "w", newline="") as csvfile: 2198 writer = csv.writer(csvfile) 2199 for i in range(count): 2200 account = f"acc-{random.randint(1, 1000)}" 2201 desc = f"Some text {random.randint(1, 1000)}" 2202 value = random.randint(1000, 100000) 2203 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2204 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2205 if not i % 13 == 0: 2206 value *= -1 2207 row = [account, desc, value, date] 2208 if with_rate: 2209 rate = random.randint(1, 100) * 0.12 2210 if debug: 2211 print('before-append', row) 2212 row.append(rate) 2213 if debug: 2214 print('after-append', row) 2215 writer.writerow(row) 2216 i = i + 1 2217 return i 2218 2219 @staticmethod 2220 def create_random_list(max_sum, min_value=0, max_value=10): 2221 """ 2222 Creates a list of random integers whose sum does not exceed the specified maximum. 2223 2224 Args: 2225 max_sum: The maximum allowed sum of the list elements. 2226 min_value: The minimum possible value for an element (inclusive). 2227 max_value: The maximum possible value for an element (inclusive). 2228 2229 Returns: 2230 A list of random integers. 2231 """ 2232 result = [] 2233 current_sum = 0 2234 2235 while current_sum < max_sum: 2236 # Calculate the remaining space for the next element 2237 remaining_sum = max_sum - current_sum 2238 # Determine the maximum possible value for the next element 2239 next_max_value = min(remaining_sum, max_value) 2240 # Generate a random element within the allowed range 2241 next_element = random.randint(min_value, next_max_value) 2242 result.append(next_element) 2243 current_sum += next_element 2244 2245 return result 2246 2247 def _test_core(self, restore=False, debug=False): 2248 2249 if debug: 2250 random.seed(1234567890) 2251 2252 # sanity check - random forward time 2253 2254 xlist = [] 2255 limit = 1000 2256 for _ in range(limit): 2257 y = ZakatTracker.time() 2258 z = '-' 2259 if y not in xlist: 2260 xlist.append(y) 2261 else: 2262 z = 'x' 2263 if debug: 2264 print(z, y) 2265 xx = len(xlist) 2266 if debug: 2267 print('count', xx, ' - unique: ', (xx / limit) * 100, '%') 2268 assert limit == xx 2269 2270 # sanity check - convert date since 1000AD 2271 2272 for year in range(1000, 9000): 2273 ns = ZakatTracker.time(datetime.datetime.strptime(f"{year}-12-30 18:30:45", "%Y-%m-%d %H:%M:%S")) 2274 date = ZakatTracker.time_to_datetime(ns) 2275 if debug: 2276 print(date) 2277 assert date.year == year 2278 assert date.month == 12 2279 assert date.day == 30 2280 assert date.hour == 18 2281 assert date.minute == 30 2282 assert date.second in [44, 45] 2283 2284 # human_readable_size 2285 2286 assert ZakatTracker.human_readable_size(0) == "0.00 B" 2287 assert ZakatTracker.human_readable_size(512) == "512.00 B" 2288 assert ZakatTracker.human_readable_size(1023) == "1023.00 B" 2289 2290 assert ZakatTracker.human_readable_size(1024) == "1.00 KB" 2291 assert ZakatTracker.human_readable_size(2048) == "2.00 KB" 2292 assert ZakatTracker.human_readable_size(5120) == "5.00 KB" 2293 2294 assert ZakatTracker.human_readable_size(1024 ** 2) == "1.00 MB" 2295 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2) == "2.50 MB" 2296 2297 assert ZakatTracker.human_readable_size(1024 ** 3) == "1.00 GB" 2298 assert ZakatTracker.human_readable_size(1024 ** 4) == "1.00 TB" 2299 assert ZakatTracker.human_readable_size(1024 ** 5) == "1.00 PB" 2300 2301 assert ZakatTracker.human_readable_size(1536, decimal_places=0) == "2 KB" 2302 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2, decimal_places=1) == "2.5 MB" 2303 assert ZakatTracker.human_readable_size(1234567890, decimal_places=3) == "1.150 GB" 2304 2305 try: 2306 ZakatTracker.human_readable_size("not a number") 2307 assert False, "Expected TypeError for invalid input" 2308 except TypeError: 2309 pass 2310 2311 try: 2312 ZakatTracker.human_readable_size(1024, decimal_places="not an int") 2313 assert False, "Expected TypeError for invalid decimal_places" 2314 except TypeError: 2315 pass 2316 2317 # get_dict_size 2318 assert ZakatTracker.get_dict_size({}) == sys.getsizeof({}), "Empty dictionary size mismatch" 2319 assert ZakatTracker.get_dict_size({"a": 1, "b": 2.5, "c": True}) != sys.getsizeof({}), "Not Empty dictionary" 2320 2321 # number scale 2322 error = 0 2323 total = 0 2324 for sign in ['', '-']: 2325 for max_i, max_j, decimal_places in [ 2326 (101, 101, 2), # fiat currency minimum unit took 2 decimal places 2327 (1, 1_000, 8), # cryptocurrency like Satoshi in Bitcoin took 8 decimal places 2328 (1, 1_000, 18) # cryptocurrency like Wei in Ethereum took 18 decimal places 2329 ]: 2330 for return_type in ( 2331 float, 2332 Decimal, 2333 ): 2334 for i in range(max_i): 2335 for j in range(max_j): 2336 total += 1 2337 num_str = f'{sign}{i}.{j:0{decimal_places}d}' 2338 num = return_type(num_str) 2339 scaled = self.scale(num, decimal_places=decimal_places) 2340 unscaled = self.unscale(scaled, return_type=return_type, decimal_places=decimal_places) 2341 if debug: 2342 print( 2343 f'return_type: {return_type}, num_str: {num_str} - num: {num} - scaled: {scaled} - unscaled: {unscaled}') 2344 if unscaled != num: 2345 if debug: 2346 print('***** SCALE ERROR *****') 2347 error += 1 2348 if debug: 2349 print(f'total: {total}, error({error}): {100 * error / total}%') 2350 assert error == 0 2351 2352 assert self.nolock() 2353 assert self._history() is True 2354 2355 table = { 2356 1: [ 2357 (0, 10, 1000, 1000, 1000, 1, 1), 2358 (0, 20, 3000, 3000, 3000, 2, 2), 2359 (0, 30, 6000, 6000, 6000, 3, 3), 2360 (1, 15, 4500, 4500, 4500, 3, 4), 2361 (1, 50, -500, -500, -500, 4, 5), 2362 (1, 100, -10500, -10500, -10500, 5, 6), 2363 ], 2364 'wallet': [ 2365 (1, 90, -9000, -9000, -9000, 1, 1), 2366 (0, 100, 1000, 1000, 1000, 2, 2), 2367 (1, 190, -18000, -18000, -18000, 3, 3), 2368 (0, 1000, 82000, 82000, 82000, 4, 4), 2369 ], 2370 } 2371 for x in table: 2372 for y in table[x]: 2373 self.lock() 2374 if y[0] == 0: 2375 ref = self.track( 2376 unscaled_value=y[1], 2377 desc='test-add', 2378 account=x, 2379 logging=True, 2380 created=ZakatTracker.time(), 2381 debug=debug, 2382 ) 2383 else: 2384 (ref, z) = self.sub( 2385 unscaled_value=y[1], 2386 desc='test-sub', 2387 account=x, 2388 created=ZakatTracker.time(), 2389 ) 2390 if debug: 2391 print('_sub', z, ZakatTracker.time()) 2392 assert ref != 0 2393 assert len(self._vault['account'][x]['log'][ref]['file']) == 0 2394 for i in range(3): 2395 file_ref = self.add_file(x, ref, 'file_' + str(i)) 2396 sleep(0.0000001) 2397 assert file_ref != 0 2398 if debug: 2399 print('ref', ref, 'file', file_ref) 2400 assert len(self._vault['account'][x]['log'][ref]['file']) == i + 1 2401 file_ref = self.add_file(x, ref, 'file_' + str(3)) 2402 assert self.remove_file(x, ref, file_ref) 2403 z = self.balance(x) 2404 if debug: 2405 print("debug-0", z, y) 2406 assert z == y[2] 2407 z = self.balance(x, False) 2408 if debug: 2409 print("debug-1", z, y[3]) 2410 assert z == y[3] 2411 o = self._vault['account'][x]['log'] 2412 z = 0 2413 for i in o: 2414 z += o[i]['value'] 2415 if debug: 2416 print("debug-2", z, type(z)) 2417 print("debug-2", y[4], type(y[4])) 2418 assert z == y[4] 2419 if debug: 2420 print('debug-2 - PASSED') 2421 assert self.box_size(x) == y[5] 2422 assert self.log_size(x) == y[6] 2423 assert not self.nolock() 2424 self.free(self.lock()) 2425 assert self.nolock() 2426 assert self.boxes(x) != {} 2427 assert self.logs(x) != {} 2428 2429 assert not self.hide(x) 2430 assert self.hide(x, False) is False 2431 assert self.hide(x) is False 2432 assert self.hide(x, True) 2433 assert self.hide(x) 2434 2435 assert self.zakatable(x) 2436 assert self.zakatable(x, False) is False 2437 assert self.zakatable(x) is False 2438 assert self.zakatable(x, True) 2439 assert self.zakatable(x) 2440 2441 if restore is True: 2442 count = len(self._vault['history']) 2443 if debug: 2444 print('history-count', count) 2445 assert count == 10 2446 # try mode 2447 for _ in range(count): 2448 assert self.recall(True, debug) 2449 count = len(self._vault['history']) 2450 if debug: 2451 print('history-count', count) 2452 assert count == 10 2453 _accounts = list(table.keys()) 2454 accounts_limit = len(_accounts) + 1 2455 for i in range(-1, -accounts_limit, -1): 2456 account = _accounts[i] 2457 if debug: 2458 print(account, len(table[account])) 2459 transaction_limit = len(table[account]) + 1 2460 for j in range(-1, -transaction_limit, -1): 2461 row = table[account][j] 2462 if debug: 2463 print(row, self.balance(account), self.balance(account, False)) 2464 assert self.balance(account) == self.balance(account, False) 2465 assert self.balance(account) == row[2] 2466 assert self.recall(False, debug) 2467 assert self.recall(False, debug) is False 2468 count = len(self._vault['history']) 2469 if debug: 2470 print('history-count', count) 2471 assert count == 0 2472 self.reset() 2473 2474 def test(self, debug: bool = False) -> bool: 2475 if debug: 2476 print('test', f'debug={debug}') 2477 try: 2478 2479 self._test_core(True, debug) 2480 self._test_core(False, debug) 2481 2482 assert self._history() 2483 2484 # Not allowed for duplicate transactions in the same account and time 2485 2486 created = ZakatTracker.time() 2487 self.track(100, 'test-1', 'same', True, created) 2488 failed = False 2489 try: 2490 self.track(50, 'test-1', 'same', True, created) 2491 except: 2492 failed = True 2493 assert failed is True 2494 2495 self.reset() 2496 2497 # Same account transfer 2498 for x in [1, 'a', True, 1.8, None]: 2499 failed = False 2500 try: 2501 self.transfer(1, x, x, 'same-account', debug=debug) 2502 except: 2503 failed = True 2504 assert failed is True 2505 2506 # Always preserve box age during transfer 2507 2508 series: list[tuple] = [ 2509 (30, 4), 2510 (60, 3), 2511 (90, 2), 2512 ] 2513 case = { 2514 3000: { 2515 'series': series, 2516 'rest': 15000, 2517 }, 2518 6000: { 2519 'series': series, 2520 'rest': 12000, 2521 }, 2522 9000: { 2523 'series': series, 2524 'rest': 9000, 2525 }, 2526 18000: { 2527 'series': series, 2528 'rest': 0, 2529 }, 2530 27000: { 2531 'series': series, 2532 'rest': -9000, 2533 }, 2534 36000: { 2535 'series': series, 2536 'rest': -18000, 2537 }, 2538 } 2539 2540 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2541 2542 for total in case: 2543 if debug: 2544 print('--------------------------------------------------------') 2545 print(f'case[{total}]', case[total]) 2546 for x in case[total]['series']: 2547 self.track( 2548 unscaled_value=x[0], 2549 desc=f"test-{x} ages", 2550 account='ages', 2551 logging=True, 2552 created=selected_time * x[1], 2553 ) 2554 2555 unscaled_total = self.unscale(total) 2556 if debug: 2557 print('unscaled_total', unscaled_total) 2558 refs = self.transfer( 2559 unscaled_amount=unscaled_total, 2560 from_account='ages', 2561 to_account='future', 2562 desc='Zakat Movement', 2563 debug=debug, 2564 ) 2565 2566 if debug: 2567 print('refs', refs) 2568 2569 ages_cache_balance = self.balance('ages') 2570 ages_fresh_balance = self.balance('ages', False) 2571 rest = case[total]['rest'] 2572 if debug: 2573 print('source', ages_cache_balance, ages_fresh_balance, rest) 2574 assert ages_cache_balance == rest 2575 assert ages_fresh_balance == rest 2576 2577 future_cache_balance = self.balance('future') 2578 future_fresh_balance = self.balance('future', False) 2579 if debug: 2580 print('target', future_cache_balance, future_fresh_balance, total) 2581 print('refs', refs) 2582 assert future_cache_balance == total 2583 assert future_fresh_balance == total 2584 2585 # TODO: check boxes times for `ages` should equal box times in `future` 2586 for ref in self._vault['account']['ages']['box']: 2587 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2588 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2589 future_capital = 0 2590 if ref in self._vault['account']['future']['box']: 2591 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2592 future_rest = 0 2593 if ref in self._vault['account']['future']['box']: 2594 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2595 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2596 if debug: 2597 print('================================================================') 2598 print('ages', ages_capital, ages_rest) 2599 print('future', future_capital, future_rest) 2600 if ages_rest == 0: 2601 assert ages_capital == future_capital 2602 elif ages_rest < 0: 2603 assert -ages_capital == future_capital 2604 elif ages_rest > 0: 2605 assert ages_capital == ages_rest + future_capital 2606 self.reset() 2607 assert len(self._vault['history']) == 0 2608 2609 assert self._history() 2610 assert self._history(False) is False 2611 assert self._history() is False 2612 assert self._history(True) 2613 assert self._history() 2614 if debug: 2615 print('####################################################################') 2616 2617 transaction = [ 2618 ( 2619 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2620 2000, 2000, 2000, 1, 1, 2621 ), 2622 ( 2623 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2624 75000, 75000, 75000, 1, 1, 2625 ), 2626 ( 2627 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2628 60000, 60000, 60000, 1, 1, 2629 ), 2630 ] 2631 for z in transaction: 2632 self.lock() 2633 x = z[1] 2634 y = z[2] 2635 self.transfer( 2636 unscaled_amount=z[0], 2637 from_account=x, 2638 to_account=y, 2639 desc='test-transfer', 2640 debug=debug, 2641 ) 2642 zz = self.balance(x) 2643 if debug: 2644 print(zz, z) 2645 assert zz == z[3] 2646 xx = self.accounts()[x] 2647 assert xx == z[3] 2648 assert self.balance(x, False) == z[4] 2649 assert xx == z[4] 2650 2651 s = 0 2652 log = self._vault['account'][x]['log'] 2653 for i in log: 2654 s += log[i]['value'] 2655 if debug: 2656 print('s', s, 'z[5]', z[5]) 2657 assert s == z[5] 2658 2659 assert self.box_size(x) == z[6] 2660 assert self.log_size(x) == z[7] 2661 2662 yy = self.accounts()[y] 2663 assert self.balance(y) == z[8] 2664 assert yy == z[8] 2665 assert self.balance(y, False) == z[9] 2666 assert yy == z[9] 2667 2668 s = 0 2669 log = self._vault['account'][y]['log'] 2670 for i in log: 2671 s += log[i]['value'] 2672 assert s == z[10] 2673 2674 assert self.box_size(y) == z[11] 2675 assert self.log_size(y) == z[12] 2676 assert self.free(self.lock()) 2677 2678 if debug: 2679 pp().pprint(self.check(2.17)) 2680 2681 assert not self.nolock() 2682 history_count = len(self._vault['history']) 2683 if debug: 2684 print('history-count', history_count) 2685 assert history_count == 4 2686 assert not self.free(ZakatTracker.time()) 2687 assert self.free(self.lock()) 2688 assert self.nolock() 2689 assert len(self._vault['history']) == 3 2690 2691 # storage 2692 2693 _path = self.path(f'test.{self.ext()}') 2694 if os.path.exists(_path): 2695 os.remove(_path) 2696 self.save() 2697 assert os.path.getsize(_path) > 0 2698 self.reset() 2699 assert self.recall(False, debug) is False 2700 self.load() 2701 assert self._vault['account'] is not None 2702 2703 # recall 2704 2705 assert self.nolock() 2706 assert len(self._vault['history']) == 3 2707 assert self.recall(False, debug) is True 2708 assert len(self._vault['history']) == 2 2709 assert self.recall(False, debug) is True 2710 assert len(self._vault['history']) == 1 2711 assert self.recall(False, debug) is True 2712 assert len(self._vault['history']) == 0 2713 assert self.recall(False, debug) is False 2714 assert len(self._vault['history']) == 0 2715 2716 # exchange 2717 2718 self.exchange("cash", 25, 3.75, "2024-06-25") 2719 self.exchange("cash", 22, 3.73, "2024-06-22") 2720 self.exchange("cash", 15, 3.69, "2024-06-15") 2721 self.exchange("cash", 10, 3.66) 2722 2723 for i in range(1, 30): 2724 exchange = self.exchange("cash", i) 2725 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2726 if debug: 2727 print(i, rate, description, created) 2728 assert created 2729 if i < 10: 2730 assert rate == 1 2731 assert description is None 2732 elif i == 10: 2733 assert rate == 3.66 2734 assert description is None 2735 elif i < 15: 2736 assert rate == 3.66 2737 assert description is None 2738 elif i == 15: 2739 assert rate == 3.69 2740 assert description is not None 2741 elif i < 22: 2742 assert rate == 3.69 2743 assert description is not None 2744 elif i == 22: 2745 assert rate == 3.73 2746 assert description is not None 2747 elif i >= 25: 2748 assert rate == 3.75 2749 assert description is not None 2750 exchange = self.exchange("bank", i) 2751 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2752 if debug: 2753 print(i, rate, description, created) 2754 assert created 2755 assert rate == 1 2756 assert description is None 2757 2758 assert len(self._vault['exchange']) > 0 2759 assert len(self.exchanges()) > 0 2760 self._vault['exchange'].clear() 2761 assert len(self._vault['exchange']) == 0 2762 assert len(self.exchanges()) == 0 2763 2764 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2765 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2766 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2767 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2768 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2769 2770 for i in [x * 0.12 for x in range(-15, 21)]: 2771 if i <= 0: 2772 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2773 else: 2774 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2775 2776 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2777 for i in range(1, 31): 2778 timestamp_ns = ZakatTracker.day_to_time(i) 2779 exchange = self.exchange("cash", timestamp_ns) 2780 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2781 if debug: 2782 print(i, rate, description, created) 2783 assert created 2784 if i < 10: 2785 assert rate == 1 2786 assert description is None 2787 elif i == 10: 2788 assert rate == 3.66 2789 assert description is None 2790 elif i < 15: 2791 assert rate == 3.66 2792 assert description is None 2793 elif i == 15: 2794 assert rate == 3.69 2795 assert description is not None 2796 elif i < 22: 2797 assert rate == 3.69 2798 assert description is not None 2799 elif i == 22: 2800 assert rate == 3.73 2801 assert description is not None 2802 elif i >= 25: 2803 assert rate == 3.75 2804 assert description is not None 2805 exchange = self.exchange("bank", i) 2806 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2807 if debug: 2808 print(i, rate, description, created) 2809 assert created 2810 assert rate == 1 2811 assert description is None 2812 2813 # csv 2814 2815 csv_count = 1000 2816 2817 for with_rate, path in { 2818 False: 'test-import_csv-no-exchange', 2819 True: 'test-import_csv-with-exchange', 2820 }.items(): 2821 2822 if debug: 2823 print('test_import_csv', with_rate, path) 2824 2825 csv_path = path + '.csv' 2826 if os.path.exists(csv_path): 2827 os.remove(csv_path) 2828 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2829 if debug: 2830 print('generate_random_csv_file', c) 2831 assert c == csv_count 2832 assert os.path.getsize(csv_path) > 0 2833 cache_path = self.import_csv_cache_path() 2834 if os.path.exists(cache_path): 2835 os.remove(cache_path) 2836 self.reset() 2837 (created, found, bad) = self.import_csv(csv_path, debug) 2838 bad_count = len(bad) 2839 assert bad_count > 0 2840 if debug: 2841 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2842 print('bad', bad) 2843 tmp_size = os.path.getsize(cache_path) 2844 assert tmp_size > 0 2845 # TODO: assert created + found + bad_count == csv_count 2846 # TODO: assert created == csv_count 2847 # TODO: assert bad_count == 0 2848 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2849 bad_2_count = len(bad_2) 2850 if debug: 2851 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2852 print('bad', bad) 2853 assert bad_2_count > 0 2854 # TODO: assert tmp_size == os.path.getsize(cache_path) 2855 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2856 # TODO: assert created == found_2 2857 # TODO: assert bad_count == bad_2_count 2858 # TODO: assert found_2 == csv_count 2859 # TODO: assert bad_2_count == 0 2860 # TODO: assert created_2 == 0 2861 2862 # payment parts 2863 2864 positive_parts = self.build_payment_parts(100, positive_only=True) 2865 assert self.check_payment_parts(positive_parts) != 0 2866 assert self.check_payment_parts(positive_parts) != 0 2867 all_parts = self.build_payment_parts(300, positive_only=False) 2868 assert self.check_payment_parts(all_parts) != 0 2869 assert self.check_payment_parts(all_parts) != 0 2870 if debug: 2871 pp().pprint(positive_parts) 2872 pp().pprint(all_parts) 2873 # dynamic discount 2874 suite = [] 2875 count = 3 2876 for exceed in [False, True]: 2877 case = [] 2878 for parts in [positive_parts, all_parts]: 2879 part = parts.copy() 2880 demand = part['demand'] 2881 if debug: 2882 print(demand, part['total']) 2883 i = 0 2884 z = demand / count 2885 cp = { 2886 'account': {}, 2887 'demand': demand, 2888 'exceed': exceed, 2889 'total': part['total'], 2890 } 2891 j = '' 2892 for x, y in part['account'].items(): 2893 x_exchange = self.exchange(x) 2894 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2895 if exceed and zz <= demand: 2896 i += 1 2897 y['part'] = zz 2898 if debug: 2899 print(exceed, y) 2900 cp['account'][x] = y 2901 case.append(y) 2902 elif not exceed and y['balance'] >= zz: 2903 i += 1 2904 y['part'] = zz 2905 if debug: 2906 print(exceed, y) 2907 cp['account'][x] = y 2908 case.append(y) 2909 j = x 2910 if i >= count: 2911 break 2912 if len(cp['account'][j]) > 0: 2913 suite.append(cp) 2914 if debug: 2915 print('suite', len(suite)) 2916 # vault = self._vault.copy() 2917 for case in suite: 2918 # self._vault = vault.copy() 2919 if debug: 2920 print('case', case) 2921 result = self.check_payment_parts(case) 2922 if debug: 2923 print('check_payment_parts', result, f'exceed: {exceed}') 2924 assert result == 0 2925 2926 report = self.check(2.17, None, debug) 2927 (valid, brief, plan) = report 2928 if debug: 2929 print('valid', valid) 2930 zakat_result = self.zakat(report, parts=case, debug=debug) 2931 if debug: 2932 print('zakat-result', zakat_result) 2933 assert valid == zakat_result 2934 2935 assert self.save(path + f'.{self.ext()}') 2936 assert self.export_json(path + '.json') 2937 2938 assert self.export_json("1000-transactions-test.json") 2939 assert self.save(f"1000-transactions-test.{self.ext()}") 2940 2941 self.reset() 2942 2943 # test transfer between accounts with different exchange rate 2944 2945 a_SAR = "Bank (SAR)" 2946 b_USD = "Bank (USD)" 2947 c_SAR = "Safe (SAR)" 2948 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2949 for case in [ 2950 (0, a_SAR, "SAR Gift", 1000, 100000), 2951 (1, a_SAR, 1), 2952 (0, b_USD, "USD Gift", 500, 50000), 2953 (1, b_USD, 1), 2954 (2, b_USD, 3.75), 2955 (1, b_USD, 3.75), 2956 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2957 (0, c_SAR, "Salary", 750, 75000), 2958 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2959 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2960 ]: 2961 if debug: 2962 print('case', case) 2963 match (case[0]): 2964 case 0: # track 2965 _, account, desc, x, balance = case 2966 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2967 2968 cached_value = self.balance(account, cached=True) 2969 fresh_value = self.balance(account, cached=False) 2970 if debug: 2971 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2972 assert cached_value == balance 2973 assert fresh_value == balance 2974 case 1: # check-exchange 2975 _, account, expected_rate = case 2976 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2977 if debug: 2978 print('t-exchange', t_exchange) 2979 assert t_exchange['rate'] == expected_rate 2980 case 2: # do-exchange 2981 _, account, rate = case 2982 self.exchange(account, rate=rate, debug=debug) 2983 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2984 if debug: 2985 print('b-exchange', b_exchange) 2986 assert b_exchange['rate'] == rate 2987 case 3: # transfer 2988 _, x, a, b, desc, a_balance, b_balance = case 2989 self.transfer(x, a, b, desc, debug=debug) 2990 2991 cached_value = self.balance(a, cached=True) 2992 fresh_value = self.balance(a, cached=False) 2993 if debug: 2994 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 2995 assert cached_value == a_balance 2996 assert fresh_value == a_balance 2997 2998 cached_value = self.balance(b, cached=True) 2999 fresh_value = self.balance(b, cached=False) 3000 if debug: 3001 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3002 assert cached_value == b_balance 3003 assert fresh_value == b_balance 3004 3005 # Transfer all in many chunks randomly from B to A 3006 a_SAR_balance = 137125 3007 b_USD_balance = 50100 3008 b_USD_exchange = self.exchange(b_USD) 3009 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3010 if debug: 3011 print('amounts', amounts) 3012 i = 0 3013 for x in amounts: 3014 if debug: 3015 print(f'{i} - transfer-with-exchange({x})') 3016 self.transfer( 3017 unscaled_amount=self.unscale(x), 3018 from_account=b_USD, 3019 to_account=a_SAR, 3020 desc=f"{x} USD -> SAR", 3021 debug=debug, 3022 ) 3023 3024 b_USD_balance -= x 3025 cached_value = self.balance(b_USD, cached=True) 3026 fresh_value = self.balance(b_USD, cached=False) 3027 if debug: 3028 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3029 b_USD_balance) 3030 assert cached_value == b_USD_balance 3031 assert fresh_value == b_USD_balance 3032 3033 a_SAR_balance += int(x * b_USD_exchange['rate']) 3034 cached_value = self.balance(a_SAR, cached=True) 3035 fresh_value = self.balance(a_SAR, cached=False) 3036 if debug: 3037 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3038 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3039 assert cached_value == a_SAR_balance 3040 assert fresh_value == a_SAR_balance 3041 i += 1 3042 3043 # Transfer all in many chunks randomly from C to A 3044 c_SAR_balance = 37500 3045 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3046 if debug: 3047 print('amounts', amounts) 3048 i = 0 3049 for x in amounts: 3050 if debug: 3051 print(f'{i} - transfer-with-exchange({x})') 3052 self.transfer( 3053 unscaled_amount=self.unscale(x), 3054 from_account=c_SAR, 3055 to_account=a_SAR, 3056 desc=f"{x} SAR -> a_SAR", 3057 debug=debug, 3058 ) 3059 3060 c_SAR_balance -= x 3061 cached_value = self.balance(c_SAR, cached=True) 3062 fresh_value = self.balance(c_SAR, cached=False) 3063 if debug: 3064 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3065 c_SAR_balance) 3066 assert cached_value == c_SAR_balance 3067 assert fresh_value == c_SAR_balance 3068 3069 a_SAR_balance += x 3070 cached_value = self.balance(a_SAR, cached=True) 3071 fresh_value = self.balance(a_SAR, cached=False) 3072 if debug: 3073 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3074 a_SAR_balance) 3075 assert cached_value == a_SAR_balance 3076 assert fresh_value == a_SAR_balance 3077 i += 1 3078 3079 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3080 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3081 3082 # check & zakat with exchange rates for many cycles 3083 3084 for rate, values in { 3085 1: { 3086 'in': [1000, 2000, 10000], 3087 'exchanged': [100000, 200000, 1000000], 3088 'out': [2500, 5000, 73140], 3089 }, 3090 3.75: { 3091 'in': [200, 1000, 5000], 3092 'exchanged': [75000, 375000, 1875000], 3093 'out': [1875, 9375, 137138], 3094 }, 3095 }.items(): 3096 a, b, c = values['in'] 3097 m, n, o = values['exchanged'] 3098 x, y, z = values['out'] 3099 if debug: 3100 print('rate', rate, 'values', values) 3101 for case in [ 3102 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3103 {'safe': {0: {'below_nisab': x}}}, 3104 ], False, m), 3105 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3106 {'safe': {0: {'count': 1, 'total': y}}}, 3107 ], True, n), 3108 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3109 {'cave': {0: {'count': 3, 'total': z}}}, 3110 ], True, o), 3111 ]: 3112 if debug: 3113 print(f"############# check(rate: {rate}) #############") 3114 print('case', case) 3115 self.reset() 3116 self.exchange(account=case[1], created=case[2], rate=rate) 3117 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3118 assert self.snapshot() 3119 3120 # assert self.nolock() 3121 # history_size = len(self._vault['history']) 3122 # print('history_size', history_size) 3123 # assert history_size == 2 3124 assert self.lock() 3125 assert not self.nolock() 3126 report = self.check(2.17, None, debug) 3127 (valid, brief, plan) = report 3128 if debug: 3129 print('brief', brief) 3130 assert valid == case[4] 3131 assert case[5] == brief[0] 3132 assert case[5] == brief[1] 3133 3134 if debug: 3135 pp().pprint(plan) 3136 3137 for x in plan: 3138 assert case[1] == x 3139 if 'total' in case[3][0][x][0].keys(): 3140 assert case[3][0][x][0]['total'] == int(brief[2]) 3141 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3142 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3143 else: 3144 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3145 if debug: 3146 pp().pprint(report) 3147 result = self.zakat(report, debug=debug) 3148 if debug: 3149 print('zakat-result', result, case[4]) 3150 assert result == case[4] 3151 report = self.check(2.17, None, debug) 3152 (valid, brief, plan) = report 3153 assert valid is False 3154 3155 history_size = len(self._vault['history']) 3156 if debug: 3157 print('history_size', history_size) 3158 assert history_size == 3 3159 assert not self.nolock() 3160 assert self.recall(False, debug) is False 3161 self.free(self.lock()) 3162 assert self.nolock() 3163 3164 for i in range(3, 0, -1): 3165 history_size = len(self._vault['history']) 3166 if debug: 3167 print('history_size', history_size) 3168 assert history_size == i 3169 assert self.recall(False, debug) is True 3170 3171 assert self.nolock() 3172 assert self.recall(False, debug) is False 3173 3174 history_size = len(self._vault['history']) 3175 if debug: 3176 print('history_size', history_size) 3177 assert history_size == 0 3178 3179 account_size = len(self._vault['account']) 3180 if debug: 3181 print('account_size', account_size) 3182 assert account_size == 0 3183 3184 report_size = len(self._vault['report']) 3185 if debug: 3186 print('report_size', report_size) 3187 assert report_size == 0 3188 3189 assert self.nolock() 3190 return True 3191 except: 3192 # pp().pprint(self._vault) 3193 assert self.export_json("test-snapshot.json") 3194 assert self.save(f"test-snapshot.{self.ext()}") 3195 raise
A class for tracking and calculating Zakat.
This class provides functionalities for recording transactions, calculating Zakat due, and managing account balances. It also offers features like importing transactions from CSV files, exporting data to JSON format, and saving/loading the tracker state.
The ZakatTracker
class is designed to handle both positive and negative transactions,
allowing for flexible tracking of financial activities related to Zakat. It also supports
the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due
based on the current silver price.
The class uses a camel file as its database to persist the tracker state, ensuring data integrity across sessions. It also provides options for enabling or disabling history tracking, allowing users to choose their preferred level of detail.
In addition, the ZakatTracker
class includes various helper methods like
time
, time_to_datetime
, lock
, free
, recall
, export_json
,
and more. These methods provide additional functionalities and flexibility
for interacting with and managing the Zakat tracker.
Attributes: ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. ZakatTracker.Version (function): The version of the ZakatTracker class.
Data Structure: The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data.
_vault (dict):
- account (dict):
- {account_number} (dict):
- balance (int): The current balance of the account.
- box (dict): A dictionary storing transaction details.
- {timestamp} (dict):
- capital (int): The initial amount of the transaction.
- count (int): The number of times Zakat has been calculated for this transaction.
- last (int): The timestamp of the last Zakat calculation.
- rest (int): The remaining amount after Zakat deductions and withdrawal.
- total (int): The total Zakat deducted from this transaction.
- count (int): The total number of transactions for the account.
- log (dict): A dictionary storing transaction logs.
- {timestamp} (dict):
- value (int): The transaction amount (positive or negative).
- desc (str): The description of the transaction.
- ref (int): The box reference (positive or None).
- file (dict): A dictionary storing file references associated with the transaction.
- hide (bool): Indicates whether the account is hidden or not.
- zakatable (bool): Indicates whether the account is subject to Zakat.
- exchange (dict):
- account (dict):
- {timestamps} (dict):
- rate (float): Exchange rate when compared to local currency.
- description (str): The description of the exchange rate.
- history (dict):
- {timestamp} (list): A list of dictionaries storing the history of actions performed.
- {action_dict} (dict):
- action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT).
- account (str): The account number associated with the action.
- ref (int): The reference number of the transaction.
- file (int): The reference number of the file (if applicable).
- key (str): The key associated with the action (e.g., 'rest', 'total').
- value (int): The value associated with the action.
- math (MathOperation): The mathematical operation performed (if applicable).
- lock (int or None): The timestamp indicating the current lock status (None if not locked).
- report (dict):
- {timestamp} (tuple): A tuple storing Zakat report details.
271 def __init__(self, db_path: str = "zakat.camel", history_mode: bool = True): 272 """ 273 Initialize ZakatTracker with database path and history mode. 274 275 Parameters: 276 db_path (str): The path to the database file. Default is "zakat.camel". 277 history_mode (bool): The mode for tracking history. Default is True. 278 279 Returns: 280 None 281 """ 282 self._base_path = None 283 self._vault_path = None 284 self._vault = None 285 self.reset() 286 self._history(history_mode) 287 self.path(db_path)
Initialize ZakatTracker with database path and history mode.
Parameters: db_path (str): The path to the database file. Default is "zakat.camel". history_mode (bool): The mode for tracking history. Default is True.
Returns: None
195 @staticmethod 196 def Version() -> str: 197 """ 198 Returns the current version of the software. 199 200 This function returns a string representing the current version of the software, 201 including major, minor, and patch version numbers in the format "X.Y.Z". 202 203 Returns: 204 str: The current version of the software. 205 """ 206 return '0.2.83'
Returns the current version of the software.
This function returns a string representing the current version of the software, including major, minor, and patch version numbers in the format "X.Y.Z".
Returns: str: The current version of the software.
208 @staticmethod 209 def ZakatCut(x: float) -> float: 210 """ 211 Calculates the Zakat amount due on an asset. 212 213 This function calculates the zakat amount due on a given asset value over one lunar year. 214 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 215 that exceeds a certain threshold (Nisab). 216 217 Parameters: 218 x: The total value of the asset on which Zakat is to be calculated. 219 220 Returns: 221 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 222 """ 223 return 0.025 * x # Zakat Cut in one Lunar Year
Calculates the Zakat amount due on an asset.
This function calculates the zakat amount due on a given asset value over one lunar year. Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth that exceeds a certain threshold (Nisab).
Parameters: x: The total value of the asset on which Zakat is to be calculated.
Returns: The amount of Zakat due on the asset, calculated as 2.5% of the asset's value.
225 @staticmethod 226 def TimeCycle(days: int = 355) -> int: 227 """ 228 Calculates the approximate duration of a lunar year in nanoseconds. 229 230 This function calculates the approximate duration of a lunar year based on the given number of days. 231 It converts the given number of days into nanoseconds for use in high-precision timing applications. 232 233 Parameters: 234 days: The number of days in a lunar year. Defaults to 355, 235 which is an approximation of the average length of a lunar year. 236 237 Returns: 238 The approximate duration of a lunar year in nanoseconds. 239 """ 240 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds
Calculates the approximate duration of a lunar year in nanoseconds.
This function calculates the approximate duration of a lunar year based on the given number of days. It converts the given number of days into nanoseconds for use in high-precision timing applications.
Parameters: days: The number of days in a lunar year. Defaults to 355, which is an approximation of the average length of a lunar year.
Returns: The approximate duration of a lunar year in nanoseconds.
242 @staticmethod 243 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 244 """ 245 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 246 247 This function calculates the Nisab value, which is the minimum threshold of wealth, 248 that makes an individual liable for paying Zakat. 249 The Nisab value is determined by the equivalent value of a specific amount 250 of gold or silver (currently 595 grams in silver) in the local currency. 251 252 Parameters: 253 - gram_price (float): The price per gram of Nisab. 254 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 255 256 Returns: 257 - float: The total value of Nisab based on the given price per gram. 258 """ 259 return gram_price * gram_quantity
Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram.
This function calculates the Nisab value, which is the minimum threshold of wealth, that makes an individual liable for paying Zakat. The Nisab value is determined by the equivalent value of a specific amount of gold or silver (currently 595 grams in silver) in the local currency.
Parameters:
- gram_price (float): The price per gram of Nisab.
- gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver.
Returns:
- float: The total value of Nisab based on the given price per gram.
261 @staticmethod 262 def ext() -> str: 263 """ 264 Returns the file extension used by the ZakatTracker class. 265 266 Returns: 267 str: The file extension used by the ZakatTracker class, which is 'camel'. 268 """ 269 return 'camel'
Returns the file extension used by the ZakatTracker class.
Returns: str: The file extension used by the ZakatTracker class, which is 'camel'.
289 def path(self, path: str = None) -> str: 290 """ 291 Set or get the path to the database file. 292 293 If no path is provided, the current path is returned. 294 If a path is provided, it is set as the new path. 295 The function also creates the necessary directories if the provided path is a file. 296 297 Parameters: 298 path (str): The new path to the database file. If not provided, the current path is returned. 299 300 Returns: 301 str: The current or new path to the database file. 302 """ 303 if path is None: 304 return self._vault_path 305 self._vault_path = Path(path).resolve() 306 base_path = Path(path).resolve() 307 if base_path.is_file() or base_path.suffix: 308 base_path = base_path.parent 309 base_path.mkdir(parents=True, exist_ok=True) 310 self._base_path = base_path 311 return str(self._vault_path)
Set or get the path to the database file.
If no path is provided, the current path is returned. If a path is provided, it is set as the new path. The function also creates the necessary directories if the provided path is a file.
Parameters: path (str): The new path to the database file. If not provided, the current path is returned.
Returns: str: The current or new path to the database file.
313 def base_path(self, *args) -> str: 314 """ 315 Generate a base path by joining the provided arguments with the existing base path. 316 317 Parameters: 318 *args (str): Variable length argument list of strings to be joined with the base path. 319 320 Returns: 321 str: The generated base path. If no arguments are provided, the existing base path is returned. 322 """ 323 if not args: 324 return str(self._base_path) 325 filtered_args = [] 326 ignored_filename = None 327 for arg in args: 328 if Path(arg).suffix: 329 ignored_filename = arg 330 else: 331 filtered_args.append(arg) 332 base_path = Path(self._base_path) 333 full_path = base_path.joinpath(*filtered_args) 334 full_path.mkdir(parents=True, exist_ok=True) 335 if ignored_filename is not None: 336 return full_path.resolve() / ignored_filename # Join with the ignored filename 337 return str(full_path.resolve())
Generate a base path by joining the provided arguments with the existing base path.
Parameters: *args (str): Variable length argument list of strings to be joined with the base path.
Returns: str: The generated base path. If no arguments are provided, the existing base path is returned.
339 @staticmethod 340 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 341 """ 342 Scales a numerical value by a specified power of 10, returning an integer. 343 344 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 345 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 346 347 Parameters: 348 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 349 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 350 by a factor of 100 (e.g., converts 1.23 to 123). 351 352 Returns: 353 The scaled value, rounded to the nearest integer. 354 355 Raises: 356 TypeError: If the input `x` is not a valid numeric type. 357 358 Examples: 359 >>> ZakatTracker.scale(3.14159) 360 314 361 >>> ZakatTracker.scale(1234, decimal_places=3) 362 1234000 363 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 364 50 365 """ 366 if not isinstance(x, (float, int, Decimal)): 367 raise TypeError("Input 'x' must be a float, int, or Decimal.") 368 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places))
Scales a numerical value by a specified power of 10, returning an integer.
This function is designed to handle various numeric types (float
, int
, or Decimal
) and
facilitate precise scaling operations, particularly useful in financial or scientific calculations.
Parameters: x: The numeric value to scale. Can be a floating-point number, integer, or decimal. decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled by a factor of 100 (e.g., converts 1.23 to 123).
Returns: The scaled value, rounded to the nearest integer.
Raises:
TypeError: If the input x
is not a valid numeric type.
Examples:
>>> ZakatTracker.scale(3.14159)
314
>>> ZakatTracker.scale(1234, decimal_places=3)
1234000
>>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4)
50
370 @staticmethod 371 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 372 """ 373 Unscales an integer by a power of 10. 374 375 Parameters: 376 x: The integer to unscale. 377 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 378 decimal_places: The power of 10 to use. Defaults to 2. 379 380 Returns: 381 The unscaled number, converted to the specified return_type. 382 383 Raises: 384 TypeError: If the return_type is not float or Decimal. 385 """ 386 if return_type not in (float, Decimal): 387 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 388 return round(return_type(x / (10 ** decimal_places)), decimal_places)
Unscales an integer by a power of 10.
Parameters: x: The integer to unscale. return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. decimal_places: The power of 10 to use. Defaults to 2.
Returns: The unscaled number, converted to the specified return_type.
Raises: TypeError: If the return_type is not float or Decimal.
404 def reset(self) -> None: 405 """ 406 Reset the internal data structure to its initial state. 407 408 Parameters: 409 None 410 411 Returns: 412 None 413 """ 414 self._vault = { 415 'account': {}, 416 'exchange': {}, 417 'history': {}, 418 'lock': None, 419 'report': {}, 420 }
Reset the internal data structure to its initial state.
Parameters: None
Returns: None
422 @staticmethod 423 def time(now: datetime = None) -> int: 424 """ 425 Generates a timestamp based on the provided datetime object or the current datetime. 426 427 Parameters: 428 now (datetime, optional): The datetime object to generate the timestamp from. 429 If not provided, the current datetime is used. 430 431 Returns: 432 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 433 before 1970 will return in negative until 1000AD. 434 """ 435 if now is None: 436 now = datetime.datetime.now() 437 ordinal_day = now.toordinal() 438 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 439 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day)
Generates a timestamp based on the provided datetime object or the current datetime.
Parameters: now (datetime, optional): The datetime object to generate the timestamp from. If not provided, the current datetime is used.
Returns: int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), before 1970 will return in negative until 1000AD.
441 @staticmethod 442 def time_to_datetime(ordinal_ns: int) -> datetime: 443 """ 444 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 445 446 Parameters: 447 ordinal_ns (int): The ordinal number of days since 1000-01-01. 448 449 Returns: 450 datetime: The corresponding datetime object. 451 """ 452 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 453 ns_in_day = ordinal_ns % 86_400_000_000_000 454 d = datetime.datetime.fromordinal(ordinal_day) 455 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 456 return datetime.datetime.combine(d, datetime.time()) + t
Converts an ordinal number (number of days since 1000-01-01) to a datetime object.
Parameters: ordinal_ns (int): The ordinal number of days since 1000-01-01.
Returns: datetime: The corresponding datetime object.
458 def clean_history(self, lock: int | None = None) -> int: 459 """ 460 Cleans up the history of actions performed on the ZakatTracker instance. 461 462 Parameters: 463 lock (int, optional): The lock ID is used to clean up the empty history. 464 If not provided, it cleans up the empty history records for all locks. 465 466 Returns: 467 int: The number of locks cleaned up. 468 """ 469 count = 0 470 if lock in self._vault['history']: 471 if len(self._vault['history'][lock]) <= 0: 472 count += 1 473 del self._vault['history'][lock] 474 return count 475 self.free(self.lock()) 476 for lock in self._vault['history']: 477 if len(self._vault['history'][lock]) <= 0: 478 count += 1 479 del self._vault['history'][lock] 480 return count
Cleans up the history of actions performed on the ZakatTracker instance.
Parameters: lock (int, optional): The lock ID is used to clean up the empty history. If not provided, it cleans up the empty history records for all locks.
Returns: int: The number of locks cleaned up.
518 def nolock(self) -> bool: 519 """ 520 Check if the vault lock is currently not set. 521 522 Returns: 523 bool: True if the vault lock is not set, False otherwise. 524 """ 525 return self._vault['lock'] is None
Check if the vault lock is currently not set.
Returns: bool: True if the vault lock is not set, False otherwise.
527 def lock(self) -> int: 528 """ 529 Acquires a lock on the ZakatTracker instance. 530 531 Returns: 532 int: The lock ID. This ID can be used to release the lock later. 533 """ 534 return self._step()
Acquires a lock on the ZakatTracker instance.
Returns: int: The lock ID. This ID can be used to release the lock later.
536 def vault(self) -> dict: 537 """ 538 Returns a copy of the internal vault dictionary. 539 540 This method is used to retrieve the current state of the ZakatTracker object. 541 It provides a snapshot of the internal data structure, allowing for further 542 processing or analysis. 543 544 Returns: 545 dict: A copy of the internal vault dictionary. 546 """ 547 return self._vault.copy()
Returns a copy of the internal vault dictionary.
This method is used to retrieve the current state of the ZakatTracker object. It provides a snapshot of the internal data structure, allowing for further processing or analysis.
Returns: dict: A copy of the internal vault dictionary.
549 def stats(self) -> dict[str, tuple]: 550 """ 551 Calculates and returns statistics about the object's data storage. 552 553 This method determines the size of the database file on disk and the 554 size of the data currently held in RAM (likely within a dictionary). 555 Both sizes are reported in bytes and in a human-readable format 556 (e.g., KB, MB). 557 558 Returns: 559 dict[str, tuple]: A dictionary containing the following statistics: 560 561 * 'database': A tuple with two elements: 562 - The database file size in bytes (int). 563 - The database file size in human-readable format (str). 564 * 'ram': A tuple with two elements: 565 - The RAM usage (dictionary size) in bytes (int). 566 - The RAM usage in human-readable format (str). 567 568 Example: 569 >>> stats = my_object.stats() 570 >>> print(stats['database']) 571 (256000, '250.0 KB') 572 >>> print(stats['ram']) 573 (12345, '12.1 KB') 574 """ 575 ram_size = self.get_dict_size(self.vault()) 576 file_size = os.path.getsize(self.path()) 577 return { 578 'database': (file_size, self.human_readable_size(file_size)), 579 'ram': (ram_size, self.human_readable_size(ram_size)), 580 }
Calculates and returns statistics about the object's data storage.
This method determines the size of the database file on disk and the size of the data currently held in RAM (likely within a dictionary). Both sizes are reported in bytes and in a human-readable format (e.g., KB, MB).
Returns: dict[str, tuple]: A dictionary containing the following statistics:
* 'database': A tuple with two elements:
- The database file size in bytes (int).
- The database file size in human-readable format (str).
* 'ram': A tuple with two elements:
- The RAM usage (dictionary size) in bytes (int).
- The RAM usage in human-readable format (str).
Example:
>>> stats = my_object.stats()
>>> print(stats['database'])
(256000, '250.0 KB')
>>> print(stats['ram'])
(12345, '12.1 KB')
582 def files(self) -> list[dict[str, str | int]]: 583 """ 584 Retrieves information about files associated with this class. 585 586 This class method provides a standardized way to gather details about 587 files used by the class for storage, snapshots, and CSV imports. 588 589 Returns: 590 list[dict[str, str | int]]: A list of dictionaries, each containing information 591 about a specific file: 592 593 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 594 * path (str): The full file path. 595 * exists (bool): Whether the file exists on the filesystem. 596 * size (int): The file size in bytes (0 if the file doesn't exist). 597 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 598 599 Example: 600 ``` 601 file_info = MyClass.files() 602 for info in file_info: 603 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 604 ``` 605 """ 606 result = [] 607 for file_type, path in { 608 'database': self.path(), 609 'snapshot': self.snapshot_cache_path(), 610 'import_csv': self.import_csv_cache_path(), 611 }.items(): 612 exists = os.path.exists(path) 613 size = os.path.getsize(path) if exists else 0 614 human_readable_size = self.human_readable_size(size) if exists else 0 615 result.append({ 616 'type': file_type, 617 'path': path, 618 'exists': exists, 619 'size': size, 620 'human_readable_size': human_readable_size, 621 }) 622 return result
Retrieves information about files associated with this class.
This class method provides a standardized way to gather details about files used by the class for storage, snapshots, and CSV imports.
Returns: list[dict[str, str | int]]: A list of dictionaries, each containing information about a specific file:
* type (str): The type of file ('database', 'snapshot', 'import_csv').
* path (str): The full file path.
* exists (bool): Whether the file exists on the filesystem.
* size (int): The file size in bytes (0 if the file doesn't exist).
* human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB').
Example:
file_info = MyClass.files()
for info in file_info:
print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}")
624 def steps(self) -> dict: 625 """ 626 Returns a copy of the history of steps taken in the ZakatTracker. 627 628 The history is a dictionary where each key is a unique identifier for a step, 629 and the corresponding value is a dictionary containing information about the step. 630 631 Returns: 632 dict: A copy of the history of steps taken in the ZakatTracker. 633 """ 634 return self._vault['history'].copy()
Returns a copy of the history of steps taken in the ZakatTracker.
The history is a dictionary where each key is a unique identifier for a step, and the corresponding value is a dictionary containing information about the step.
Returns: dict: A copy of the history of steps taken in the ZakatTracker.
636 def free(self, lock: int, auto_save: bool = True) -> bool: 637 """ 638 Releases the lock on the database. 639 640 Parameters: 641 lock (int): The lock ID to be released. 642 auto_save (bool): Whether to automatically save the database after releasing the lock. 643 644 Returns: 645 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 646 """ 647 if lock == self._vault['lock']: 648 self._vault['lock'] = None 649 self.clean_history(lock) 650 if auto_save: 651 return self.save(self.path()) 652 return True 653 return False
Releases the lock on the database.
Parameters: lock (int): The lock ID to be released. auto_save (bool): Whether to automatically save the database after releasing the lock.
Returns: bool: True if the lock is successfully released and (optionally) saved, False otherwise.
655 def account_exists(self, account) -> bool: 656 """ 657 Check if the given account exists in the vault. 658 659 Parameters: 660 account (str): The account number to check. 661 662 Returns: 663 bool: True if the account exists, False otherwise. 664 """ 665 return account in self._vault['account']
Check if the given account exists in the vault.
Parameters: account (str): The account number to check.
Returns: bool: True if the account exists, False otherwise.
667 def box_size(self, account) -> int: 668 """ 669 Calculate the size of the box for a specific account. 670 671 Parameters: 672 account (str): The account number for which the box size needs to be calculated. 673 674 Returns: 675 int: The size of the box for the given account. If the account does not exist, -1 is returned. 676 """ 677 if self.account_exists(account): 678 return len(self._vault['account'][account]['box']) 679 return -1
Calculate the size of the box for a specific account.
Parameters: account (str): The account number for which the box size needs to be calculated.
Returns: int: The size of the box for the given account. If the account does not exist, -1 is returned.
681 def log_size(self, account) -> int: 682 """ 683 Get the size of the log for a specific account. 684 685 Parameters: 686 account (str): The account number for which the log size needs to be calculated. 687 688 Returns: 689 int: The size of the log for the given account. If the account does not exist, -1 is returned. 690 """ 691 if self.account_exists(account): 692 return len(self._vault['account'][account]['log']) 693 return -1
Get the size of the log for a specific account.
Parameters: account (str): The account number for which the log size needs to be calculated.
Returns: int: The size of the log for the given account. If the account does not exist, -1 is returned.
695 @staticmethod 696 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 697 """ 698 Calculates the hash of a file using the specified algorithm. 699 700 Parameters: 701 file_path (str): The path to the file. 702 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 703 704 Returns: 705 str: The hexadecimal representation of the file's hash. 706 """ 707 hash_obj = hashlib.new(algorithm) # Create the hash object 708 with open(file_path, "rb") as f: # Open file in binary mode for reading 709 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 710 hash_obj.update(chunk) 711 return hash_obj.hexdigest() # Return the hash as a hexadecimal string
Calculates the hash of a file using the specified algorithm.
Parameters: file_path (str): The path to the file. algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b".
Returns: str: The hexadecimal representation of the file's hash.
713 def snapshot_cache_path(self): 714 """ 715 Generate the path for the cache file used to store snapshots. 716 717 The cache file is a camel file that stores the timestamps of the snapshots. 718 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 719 720 Returns: 721 str: The path to the cache file. 722 """ 723 path = str(self.path()) 724 ext = self.ext() 725 ext_len = len(ext) 726 if path.endswith(f'.{ext}'): 727 path = path[:-ext_len-1] 728 return path + f'.snapshots.{ext}'
Generate the path for the cache file used to store snapshots.
The cache file is a camel file that stores the timestamps of the snapshots. The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel".
Returns: str: The path to the cache file.
730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True
This function creates a snapshot of the current database state.
The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. If a snapshot with the same hash exists, the function returns True without creating a new snapshot. If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp.
Parameters: None
Returns: bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails.
762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result
Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status.
Parameters:
- hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True.
- verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False.
Returns:
- dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, and the values are tuples containing the snapshot's hash, path, and existence status.
794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True
Revert the last operation.
Parameters: dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. debug (bool): If True, the function will print debug information. Default is False.
Returns: bool: True if the operation was successful, False otherwise.
942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False
Check if a specific reference (transaction) exists in the vault for a given account and reference type.
Parameters: account (str): The account number for which to check the existence of the reference. ref_type (str): The type of reference (e.g., 'box', 'log', etc.). ref (int): The reference (transaction) number to check for existence.
Returns: bool: True if the reference exists for the given account and reference type, False otherwise.
958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref)
Check if a specific box (transaction) exists in the vault for a given account and reference.
Parameters:
- account (str): The account number for which to check the existence of the box.
- ref (int): The reference (transaction) number to check for existence.
Returns:
- bool: True if the box exists for the given account and reference, False otherwise.
971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created
This function tracks a transaction for a specific account.
Parameters: unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. desc (str): The description of the transaction. Default is an empty string. account (str): The account for which the transaction is being tracked. Default is '1'. logging (bool): Whether to log the transaction. Default is True. created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. debug (bool): Whether to print debug information. Default is False.
Returns: int: The timestamp of the transaction.
This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box.
Raises: ValueError: The log transaction happened again in the same nanosecond time. ValueError: The box transaction happened again in the same nanosecond time.
1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref)
Checks if a specific transaction log entry exists for a given account.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry.
Returns: bool: True if the transaction log entry exists, False otherwise.
1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ
This method is used to record or retrieve exchange rates for a specific account.
Parameters:
- account (str): The account number for which the exchange rate is being recorded or retrieved.
- created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used.
- rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate.
- description (str): A description of the exchange rate.
Returns:
- dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, it returns a dictionary with default values for the rate and description.
1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate
This function calculates the exchanged amount of a currency.
Args: x (float): The original amount of the currency. x_rate (float): The exchange rate of the original currency. y_rate (float): The exchange rate of the target currency.
Returns: float: The exchanged amount of the target currency.
1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy()
Retrieve the recorded exchange rates for all accounts.
Parameters: None
Returns: dict: A dictionary containing all recorded exchange rates. The keys are account names or numbers, and the values are dictionaries containing the exchange rates. Each exchange rate dictionary has timestamps as keys and exchange rate details as values.
1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result
Returns a dictionary containing account numbers as keys and their respective balances as values.
Parameters: None
Returns: dict: A dictionary where keys are account numbers and values are their respective balances.
1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {}
Retrieve the boxes (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the boxes.
Returns: dict: A dictionary containing the boxes associated with the given account. If the account does not exist, an empty dictionary is returned.
1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {}
Retrieve the logs (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the logs.
Returns: dict: A dictionary containing the logs associated with the given account. If the account does not exist, an empty dictionary is returned.
1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0
Adds a file reference to a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. path (str): The path of the file to be added.
Returns: int: The reference of the added file. If the account or transaction log entry does not exist, returns 0.
1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False
Removes a file reference from a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. file_ref (int): The reference of the file to be removed.
Returns: bool: True if the file reference is successfully removed, False otherwise.
1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1]
Calculate and return the balance of a specific account.
Parameters: account (str): The account number. Default is '1'. cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True.
Returns: int: The balance of the account.
Note: If cached is True, the function returns the cached balance. If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items.
1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False
Check or set the hide status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new hide status. If not provided, the function will return the current status.
Returns: bool: The current or updated hide status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.hide('account1') # Set the hide status of 'account1' to True
False
>>> tracker.hide('account1', True) # Set the hide status of 'account1' to True
True
>>> tracker.hide('account1') # Get the hide status of 'account1' by default
True
>>> tracker.hide('account1', False)
False
1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False
Check or set the zakatable status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new zakatable status. If not provided, the function will return the current status.
Returns: bool: The current or updated zakatable status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default
True
>>> tracker.zakatable('account1', False)
False
1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages
Subtracts a specified value from an account's balance.
Parameters: unscaled_value (float | int | Decimal): The amount to be subtracted. desc (str): A description for the transaction. Defaults to an empty string. account (str): The account from which the value will be subtracted. Defaults to '1'. created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction.
If the amount to subtract is greater than the account's balance, the remaining amount will be transferred to a new transaction with a negative value.
Raises: ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times
Transfers a specified value from one account to another.
Parameters: unscaled_amount (float | int | Decimal): The amount to be transferred. from_account (str): The account from which the value will be transferred. to_account (str): The account to which the value will be transferred. desc (str, optional): A description for the transaction. Defaults to an empty string. created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: list[int]: A list of timestamps corresponding to the transactions made during the transfer.
Raises: ValueError: Transfer to the same account is forbidden. ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan
Check the eligibility for Zakat based on the given parameters.
Parameters: silver_gram_price (float): The price of a gram of silver. unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, it will be calculated based on the silver_gram_price. debug (bool): Flag to enable debug mode. now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle().
Returns: tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, and a dictionary containing the Zakat plan.
1622 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 demand (float): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': float, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': demand, 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts
Build payment parts for the Zakat distribution.
Parameters: demand (float): The total demand for payment in local currency. positive_only (bool): If True, only consider accounts with positive balance. Default is True.
Returns: dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: { 'account': { 'account_id': {'balance': float, 'rate': float, 'part': float}, ... }, 'exceed': bool, 'demand': float, 'total': float, }
1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0
Checks the validity of payment parts.
Parameters: parts (dict): A dictionary containing payment parts information. debug (bool): Flag to enable debug mode.
Returns: int: Returns 0 if the payment parts are valid, otherwise returns the error code.
Error Codes: 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 3: 'part' value in parts['account'][x] is less than 0. 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 6: The sum of 'part' values in parts['account'] does not match with 'demand' value.
1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1779 if no_lock: 1780 self.free(self.lock()) 1781 return True
Perform Zakat calculation based on the given report and optional parts.
Parameters: report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. parts (dict): A dictionary containing the payment parts for the zakat. debug (bool): A flag indicating whether to print debug information.
Returns: bool: True if the zakat calculation is successful, False otherwise.
1783 def export_json(self, path: str = "data.json") -> bool: 1784 """ 1785 Exports the current state of the ZakatTracker object to a JSON file. 1786 1787 Parameters: 1788 path (str): The path where the JSON file will be saved. Default is "data.json". 1789 1790 Returns: 1791 bool: True if the export is successful, False otherwise. 1792 1793 Raises: 1794 No specific exceptions are raised by this method. 1795 """ 1796 with open(path, "w") as file: 1797 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1798 return True
Exports the current state of the ZakatTracker object to a JSON file.
Parameters: path (str): The path where the JSON file will be saved. Default is "data.json".
Returns: bool: True if the export is successful, False otherwise.
Raises: No specific exceptions are raised by this method.
1800 def save(self, path: str = None) -> bool: 1801 """ 1802 Saves the ZakatTracker's current state to a camel file. 1803 1804 This method serializes the internal data (`_vault`). 1805 1806 Parameters: 1807 path (str, optional): File path for saving. Defaults to a predefined location. 1808 1809 Returns: 1810 bool: True if the save operation is successful, False otherwise. 1811 """ 1812 if path is None: 1813 path = self.path() 1814 with open(f'{path}.tmp', 'w') as stream: 1815 # first save in tmp file 1816 stream.write(camel.dump(self._vault)) 1817 # then move tmp file to original location 1818 shutil.move(f'{path}.tmp', path) 1819 return True
Saves the ZakatTracker's current state to a camel file.
This method serializes the internal data (_vault
).
Parameters: path (str, optional): File path for saving. Defaults to a predefined location.
Returns: bool: True if the save operation is successful, False otherwise.
1821 def load(self, path: str = None) -> bool: 1822 """ 1823 Load the current state of the ZakatTracker object from a camel file. 1824 1825 Parameters: 1826 path (str): The path where the camel file is located. If not provided, it will use the default path. 1827 1828 Returns: 1829 bool: True if the load operation is successful, False otherwise. 1830 """ 1831 if path is None: 1832 path = self.path() 1833 if os.path.exists(path): 1834 with open(path, 'r') as stream: 1835 self._vault = camel.load(stream.read()) 1836 return True 1837 return False
Load the current state of the ZakatTracker object from a camel file.
Parameters: path (str): The path where the camel file is located. If not provided, it will use the default path.
Returns: bool: True if the load operation is successful, False otherwise.
1839 def import_csv_cache_path(self): 1840 """ 1841 Generates the cache file path for imported CSV data. 1842 1843 This function constructs the file path where cached data from CSV imports 1844 will be stored. The cache file is a camel file (.camel extension) appended 1845 to the base path of the object. 1846 1847 Returns: 1848 str: The full path to the import CSV cache file. 1849 1850 Example: 1851 >>> obj = ZakatTracker('/data/reports') 1852 >>> obj.import_csv_cache_path() 1853 '/data/reports.import_csv.camel' 1854 """ 1855 path = str(self.path()) 1856 ext = self.ext() 1857 ext_len = len(ext) 1858 if path.endswith(f'.{ext}'): 1859 path = path[:-ext_len-1] 1860 return path + f'.import_csv.{ext}'
Generates the cache file path for imported CSV data.
This function constructs the file path where cached data from CSV imports will be stored. The cache file is a camel file (.camel extension) appended to the base path of the object.
Returns: str: The full path to the import CSV cache file.
Example:
obj = ZakatTracker('/data/reports') obj.import_csv_cache_path() '/data/reports.import_csv.camel'
1862 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1863 """ 1864 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1865 1866 Parameters: 1867 path (str): The path to the CSV file. Default is 'file.csv'. 1868 debug (bool): A flag indicating whether to print debug information. 1869 1870 Returns: 1871 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1872 and a dictionary of bad transactions. 1873 1874 Notes: 1875 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1876 are appropriate for the currency pairs involved in the conversions. 1877 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1878 to 1.0 or the previous rate for that account. 1879 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1880 transactions of the same account within the whole imported and existing dataset when doing `check` and 1881 `zakat` operations. 1882 1883 Example Usage: 1884 The CSV file should have the following format, rate is optional per transaction: 1885 account, desc, value, date, rate 1886 For example: 1887 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1888 """ 1889 if debug: 1890 print('import_csv', f'debug={debug}') 1891 cache: list[int] = [] 1892 try: 1893 with open(self.import_csv_cache_path(), 'r') as stream: 1894 cache = camel.load(stream.read()) 1895 except: 1896 pass 1897 date_formats = [ 1898 "%Y-%m-%d %H:%M:%S", 1899 "%Y-%m-%dT%H:%M:%S", 1900 "%Y-%m-%dT%H%M%S", 1901 "%Y-%m-%d", 1902 ] 1903 created, found, bad = 0, 0, {} 1904 data: dict[int, list] = {} 1905 with open(path, newline='', encoding="utf-8") as f: 1906 i = 0 1907 for row in csv.reader(f, delimiter=','): 1908 i += 1 1909 hashed = hash(tuple(row)) 1910 if hashed in cache: 1911 found += 1 1912 continue 1913 account = row[0] 1914 desc = row[1] 1915 value = float(row[2]) 1916 rate = 1.0 1917 if row[4:5]: # Empty list if index is out of range 1918 rate = float(row[4]) 1919 date: int = 0 1920 for time_format in date_formats: 1921 try: 1922 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1923 break 1924 except: 1925 pass 1926 # TODO: not allowed for negative dates in the future after enhance time functions 1927 if date == 0 or value == 0: 1928 bad[i] = row + ('invalid date',) 1929 continue 1930 if date not in data: 1931 data[date] = [] 1932 data[date].append((i, account, desc, value, date, rate, hashed)) 1933 1934 if debug: 1935 print('import_csv', len(data)) 1936 1937 if bad: 1938 return created, found, bad 1939 1940 for date, rows in sorted(data.items()): 1941 try: 1942 len_rows = len(rows) 1943 if len_rows == 1: 1944 (_, account, desc, value, date, rate, hashed) = rows[0] 1945 if rate > 0: 1946 self.exchange(account, created=date, rate=rate) 1947 if value > 0: 1948 self.track(value, desc, account, True, date) 1949 elif value < 0: 1950 self.sub(-value, desc, account, date) 1951 created += 1 1952 cache.append(hashed) 1953 continue 1954 if debug: 1955 print('-- Duplicated time detected', date, 'len', len_rows) 1956 print(rows) 1957 print('---------------------------------') 1958 # If records are found at the same time with different accounts in the same amount 1959 # (one positive and the other negative), this indicates it is a transfer. 1960 if len_rows != 2: 1961 raise Exception(f'more than two transactions({len_rows}) at the same time') 1962 (i, account1, desc1, value1, date1, rate1, _) = rows[0] 1963 (j, account2, desc2, value2, date2, rate2, _) = rows[1] 1964 if account1 == account2 or desc1 != desc2 or abs(value1) != abs(value2) or date1 != date2: 1965 raise Exception('invalid transfer') 1966 if rate1 > 0: 1967 self.exchange(account1, created=date1, rate=rate1) 1968 if rate2 > 0: 1969 self.exchange(account2, created=date2, rate=rate2) 1970 values = { 1971 value1: account1, 1972 value2: account2, 1973 } 1974 self.transfer( 1975 unscaled_amount=abs(value1), 1976 from_account=values[min(values.keys())], 1977 to_account=values[max(values.keys())], 1978 desc=desc1, 1979 created=date1, 1980 ) 1981 except Exception as e: 1982 for (i, account, desc, value, date, rate, _) in rows: 1983 bad[i] = (account, desc, value, date, rate, e) 1984 break 1985 with open(self.import_csv_cache_path(), 'w') as stream: 1986 stream.write(camel.dump(cache)) 1987 return created, found, bad
The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system.
Parameters: path (str): The path to the CSV file. Default is 'file.csv'. debug (bool): A flag indicating whether to print debug information.
Returns: tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, and a dictionary of bad transactions.
Notes:
* Currency Pair Assumption: This function assumes that the exchange rates stored for each account
are appropriate for the currency pairs involved in the conversions.
* The exchange rate for each account is based on the last encountered transaction rate that is not equal
to 1.0 or the previous rate for that account.
* Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent
transactions of the same account within the whole imported and existing dataset when doing check
and
zakat
operations.
Example Usage: The CSV file should have the following format, rate is optional per transaction: account, desc, value, date, rate For example: safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1
1993 @staticmethod 1994 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1995 """ 1996 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 1997 1998 This function iterates through progressively larger units of information 1999 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2000 range that can be expressed with a reasonable number before the unit. 2001 2002 Parameters: 2003 size (float): The size in bytes to convert. 2004 decimal_places (int, optional): The number of decimal places to display 2005 in the result. Defaults to 2. 2006 2007 Returns: 2008 str: A string representation of the size in a human-readable format, 2009 rounded to the specified number of decimal places. For example: 2010 - "1.50 KB" (1536 bytes) 2011 - "23.00 MB" (24117248 bytes) 2012 - "1.23 GB" (1325899906 bytes) 2013 """ 2014 if type(size) not in (float, int): 2015 raise TypeError("size must be a float or integer") 2016 if type(decimal_places) != int: 2017 raise TypeError("decimal_places must be an integer") 2018 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2019 if size < 1024.0: 2020 break 2021 size /= 1024.0 2022 return f"{size:.{decimal_places}f} {unit}"
Converts a size in bytes to a human-readable format (e.g., KB, MB, GB).
This function iterates through progressively larger units of information (B, KB, MB, GB, etc.) and divides the input size until it fits within a range that can be expressed with a reasonable number before the unit.
Parameters: size (float): The size in bytes to convert. decimal_places (int, optional): The number of decimal places to display in the result. Defaults to 2.
Returns: str: A string representation of the size in a human-readable format, rounded to the specified number of decimal places. For example: - "1.50 KB" (1536 bytes) - "23.00 MB" (24117248 bytes) - "1.23 GB" (1325899906 bytes)
2024 @staticmethod 2025 def get_dict_size(obj: dict, seen: set = None) -> float: 2026 """ 2027 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2028 2029 This function traverses the dictionary structure, accounting for the size of keys, values, 2030 and any nested objects. It handles various data types commonly found in dictionaries 2031 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2032 of circular references. 2033 2034 Parameters: 2035 obj (dict): The dictionary whose size is to be calculated. 2036 seen (set, optional): A set used internally to track visited objects 2037 and avoid circular references. Defaults to None. 2038 2039 Returns: 2040 float: An approximate size of the dictionary and its contents in bytes. 2041 2042 Note: 2043 - This function is a method of the `ZakatTracker` class and is likely used to 2044 estimate the memory footprint of data structures relevant to Zakat calculations. 2045 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2046 not account for all memory overhead depending on the Python implementation. 2047 - Circular references are handled to prevent infinite recursion. 2048 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2049 - String sizes are estimated based on character length and encoding. 2050 """ 2051 size = 0 2052 if seen is None: 2053 seen = set() 2054 2055 obj_id = id(obj) 2056 if obj_id in seen: 2057 return 0 2058 2059 seen.add(obj_id) 2060 size += sys.getsizeof(obj) 2061 2062 if isinstance(obj, dict): 2063 for k, v in obj.items(): 2064 size += ZakatTracker.get_dict_size(k, seen) 2065 size += ZakatTracker.get_dict_size(v, seen) 2066 elif isinstance(obj, (list, tuple, set, frozenset)): 2067 for item in obj: 2068 size += ZakatTracker.get_dict_size(item, seen) 2069 elif isinstance(obj, (int, float, complex)): # Handle numbers 2070 pass # Basic numbers have a fixed size, so nothing to add here 2071 elif isinstance(obj, str): # Handle strings 2072 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2073 return size
Recursively calculates the approximate memory size of a dictionary and its contents in bytes.
This function traverses the dictionary structure, accounting for the size of keys, values, and any nested objects. It handles various data types commonly found in dictionaries (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case of circular references.
Parameters: obj (dict): The dictionary whose size is to be calculated. seen (set, optional): A set used internally to track visited objects and avoid circular references. Defaults to None.
Returns: float: An approximate size of the dictionary and its contents in bytes.
Note:
- This function is a method of the
ZakatTracker
class and is likely used to estimate the memory footprint of data structures relevant to Zakat calculations. - The size calculation is approximate as it relies on
sys.getsizeof()
, which might not account for all memory overhead depending on the Python implementation. - Circular references are handled to prevent infinite recursion.
- Basic numeric types (int, float, complex) are assumed to have fixed sizes.
- String sizes are estimated based on character length and encoding.
2075 @staticmethod 2076 def duration_from_nanoseconds(ns: int, 2077 show_zeros_in_spoken_time: bool = False, 2078 spoken_time_separator=',', 2079 millennia: str = 'Millennia', 2080 century: str = 'Century', 2081 years: str = 'Years', 2082 days: str = 'Days', 2083 hours: str = 'Hours', 2084 minutes: str = 'Minutes', 2085 seconds: str = 'Seconds', 2086 milli_seconds: str = 'MilliSeconds', 2087 micro_seconds: str = 'MicroSeconds', 2088 nano_seconds: str = 'NanoSeconds', 2089 ) -> tuple: 2090 """ 2091 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2092 Convert NanoSeconds to Human Readable Time Format. 2093 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2094 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2095 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2096 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2097 2098 INPUT : ms (AKA: MilliSeconds) 2099 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2100 OUTPUT Variables: time_lapsed, spoken_time 2101 2102 Example Input: duration_from_nanoseconds(ns) 2103 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2104 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2105 duration_from_nanoseconds(1234567890123456789012) 2106 """ 2107 us, ns = divmod(ns, 1000) 2108 ms, us = divmod(us, 1000) 2109 s, ms = divmod(ms, 1000) 2110 m, s = divmod(s, 60) 2111 h, m = divmod(m, 60) 2112 d, h = divmod(h, 24) 2113 y, d = divmod(d, 365) 2114 c, y = divmod(y, 100) 2115 n, c = divmod(c, 10) 2116 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2117 spoken_time_part = [] 2118 if n > 0 or show_zeros_in_spoken_time: 2119 spoken_time_part.append(f"{n: 3d} {millennia}") 2120 if c > 0 or show_zeros_in_spoken_time: 2121 spoken_time_part.append(f"{c: 4d} {century}") 2122 if y > 0 or show_zeros_in_spoken_time: 2123 spoken_time_part.append(f"{y: 3d} {years}") 2124 if d > 0 or show_zeros_in_spoken_time: 2125 spoken_time_part.append(f"{d: 4d} {days}") 2126 if h > 0 or show_zeros_in_spoken_time: 2127 spoken_time_part.append(f"{h: 2d} {hours}") 2128 if m > 0 or show_zeros_in_spoken_time: 2129 spoken_time_part.append(f"{m: 2d} {minutes}") 2130 if s > 0 or show_zeros_in_spoken_time: 2131 spoken_time_part.append(f"{s: 2d} {seconds}") 2132 if ms > 0 or show_zeros_in_spoken_time: 2133 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2134 if us > 0 or show_zeros_in_spoken_time: 2135 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2136 if ns > 0 or show_zeros_in_spoken_time: 2137 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2138 return time_lapsed, spoken_time_separator.join(spoken_time_part)
REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 Convert NanoSeconds to Human Readable Time Format. A NanoSeconds is a unit of time in the International System of Units (SI) equal to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. Its symbol is μs, sometimes simplified to us when Unicode is not available. A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond.
INPUT : ms (AKA: MilliSeconds) OUTPUT: tuple(string time_lapsed, string spoken_time) like format. OUTPUT Variables: time_lapsed, spoken_time
Example Input: duration_from_nanoseconds(ns) "Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds" Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') duration_from_nanoseconds(1234567890123456789012)
2140 @staticmethod 2141 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2142 """ 2143 Convert a specific day, month, and year into a timestamp. 2144 2145 Parameters: 2146 day (int): The day of the month. 2147 month (int): The month of the year. Default is 6 (June). 2148 year (int): The year. Default is 2024. 2149 2150 Returns: 2151 int: The timestamp representing the given day, month, and year. 2152 2153 Note: 2154 This method assumes the default month and year if not provided. 2155 """ 2156 return ZakatTracker.time(datetime.datetime(year, month, day))
Convert a specific day, month, and year into a timestamp.
Parameters: day (int): The day of the month. month (int): The month of the year. Default is 6 (June). year (int): The year. Default is 2024.
Returns: int: The timestamp representing the given day, month, and year.
Note: This method assumes the default month and year if not provided.
2158 @staticmethod 2159 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2160 """ 2161 Generate a random date between two given dates. 2162 2163 Parameters: 2164 start_date (datetime.datetime): The start date from which to generate a random date. 2165 end_date (datetime.datetime): The end date until which to generate a random date. 2166 2167 Returns: 2168 datetime.datetime: A random date between the start_date and end_date. 2169 """ 2170 time_between_dates = end_date - start_date 2171 days_between_dates = time_between_dates.days 2172 random_number_of_days = random.randrange(days_between_dates) 2173 return start_date + datetime.timedelta(days=random_number_of_days)
Generate a random date between two given dates.
Parameters: start_date (datetime.datetime): The start date from which to generate a random date. end_date (datetime.datetime): The end date until which to generate a random date.
Returns: datetime.datetime: A random date between the start_date and end_date.
2175 @staticmethod 2176 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2177 debug: bool = False) -> int: 2178 """ 2179 Generate a random CSV file with specified parameters. 2180 2181 Parameters: 2182 path (str): The path where the CSV file will be saved. Default is "data.csv". 2183 count (int): The number of rows to generate in the CSV file. Default is 1000. 2184 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2185 debug (bool): A flag indicating whether to print debug information. 2186 2187 Returns: 2188 None. The function generates a CSV file at the specified path with the given count of rows. 2189 Each row contains a randomly generated account, description, value, and date. 2190 The value is randomly generated between 1000 and 100000, 2191 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2192 If the row number is not divisible by 13, the value is multiplied by -1. 2193 """ 2194 if debug: 2195 print('generate_random_csv_file', f'debug={debug}') 2196 i = 0 2197 with open(path, "w", newline="") as csvfile: 2198 writer = csv.writer(csvfile) 2199 for i in range(count): 2200 account = f"acc-{random.randint(1, 1000)}" 2201 desc = f"Some text {random.randint(1, 1000)}" 2202 value = random.randint(1000, 100000) 2203 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2204 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2205 if not i % 13 == 0: 2206 value *= -1 2207 row = [account, desc, value, date] 2208 if with_rate: 2209 rate = random.randint(1, 100) * 0.12 2210 if debug: 2211 print('before-append', row) 2212 row.append(rate) 2213 if debug: 2214 print('after-append', row) 2215 writer.writerow(row) 2216 i = i + 1 2217 return i
Generate a random CSV file with specified parameters.
Parameters: path (str): The path where the CSV file will be saved. Default is "data.csv". count (int): The number of rows to generate in the CSV file. Default is 1000. with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. debug (bool): A flag indicating whether to print debug information.
Returns: None. The function generates a CSV file at the specified path with the given count of rows. Each row contains a randomly generated account, description, value, and date. The value is randomly generated between 1000 and 100000, and the date is randomly generated between 1950-01-01 and 2023-12-31. If the row number is not divisible by 13, the value is multiplied by -1.
2219 @staticmethod 2220 def create_random_list(max_sum, min_value=0, max_value=10): 2221 """ 2222 Creates a list of random integers whose sum does not exceed the specified maximum. 2223 2224 Args: 2225 max_sum: The maximum allowed sum of the list elements. 2226 min_value: The minimum possible value for an element (inclusive). 2227 max_value: The maximum possible value for an element (inclusive). 2228 2229 Returns: 2230 A list of random integers. 2231 """ 2232 result = [] 2233 current_sum = 0 2234 2235 while current_sum < max_sum: 2236 # Calculate the remaining space for the next element 2237 remaining_sum = max_sum - current_sum 2238 # Determine the maximum possible value for the next element 2239 next_max_value = min(remaining_sum, max_value) 2240 # Generate a random element within the allowed range 2241 next_element = random.randint(min_value, next_max_value) 2242 result.append(next_element) 2243 current_sum += next_element 2244 2245 return result
Creates a list of random integers whose sum does not exceed the specified maximum.
Args: max_sum: The maximum allowed sum of the list elements. min_value: The minimum possible value for an element (inclusive). max_value: The maximum possible value for an element (inclusive).
Returns: A list of random integers.
2474 def test(self, debug: bool = False) -> bool: 2475 if debug: 2476 print('test', f'debug={debug}') 2477 try: 2478 2479 self._test_core(True, debug) 2480 self._test_core(False, debug) 2481 2482 assert self._history() 2483 2484 # Not allowed for duplicate transactions in the same account and time 2485 2486 created = ZakatTracker.time() 2487 self.track(100, 'test-1', 'same', True, created) 2488 failed = False 2489 try: 2490 self.track(50, 'test-1', 'same', True, created) 2491 except: 2492 failed = True 2493 assert failed is True 2494 2495 self.reset() 2496 2497 # Same account transfer 2498 for x in [1, 'a', True, 1.8, None]: 2499 failed = False 2500 try: 2501 self.transfer(1, x, x, 'same-account', debug=debug) 2502 except: 2503 failed = True 2504 assert failed is True 2505 2506 # Always preserve box age during transfer 2507 2508 series: list[tuple] = [ 2509 (30, 4), 2510 (60, 3), 2511 (90, 2), 2512 ] 2513 case = { 2514 3000: { 2515 'series': series, 2516 'rest': 15000, 2517 }, 2518 6000: { 2519 'series': series, 2520 'rest': 12000, 2521 }, 2522 9000: { 2523 'series': series, 2524 'rest': 9000, 2525 }, 2526 18000: { 2527 'series': series, 2528 'rest': 0, 2529 }, 2530 27000: { 2531 'series': series, 2532 'rest': -9000, 2533 }, 2534 36000: { 2535 'series': series, 2536 'rest': -18000, 2537 }, 2538 } 2539 2540 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2541 2542 for total in case: 2543 if debug: 2544 print('--------------------------------------------------------') 2545 print(f'case[{total}]', case[total]) 2546 for x in case[total]['series']: 2547 self.track( 2548 unscaled_value=x[0], 2549 desc=f"test-{x} ages", 2550 account='ages', 2551 logging=True, 2552 created=selected_time * x[1], 2553 ) 2554 2555 unscaled_total = self.unscale(total) 2556 if debug: 2557 print('unscaled_total', unscaled_total) 2558 refs = self.transfer( 2559 unscaled_amount=unscaled_total, 2560 from_account='ages', 2561 to_account='future', 2562 desc='Zakat Movement', 2563 debug=debug, 2564 ) 2565 2566 if debug: 2567 print('refs', refs) 2568 2569 ages_cache_balance = self.balance('ages') 2570 ages_fresh_balance = self.balance('ages', False) 2571 rest = case[total]['rest'] 2572 if debug: 2573 print('source', ages_cache_balance, ages_fresh_balance, rest) 2574 assert ages_cache_balance == rest 2575 assert ages_fresh_balance == rest 2576 2577 future_cache_balance = self.balance('future') 2578 future_fresh_balance = self.balance('future', False) 2579 if debug: 2580 print('target', future_cache_balance, future_fresh_balance, total) 2581 print('refs', refs) 2582 assert future_cache_balance == total 2583 assert future_fresh_balance == total 2584 2585 # TODO: check boxes times for `ages` should equal box times in `future` 2586 for ref in self._vault['account']['ages']['box']: 2587 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2588 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2589 future_capital = 0 2590 if ref in self._vault['account']['future']['box']: 2591 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2592 future_rest = 0 2593 if ref in self._vault['account']['future']['box']: 2594 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2595 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2596 if debug: 2597 print('================================================================') 2598 print('ages', ages_capital, ages_rest) 2599 print('future', future_capital, future_rest) 2600 if ages_rest == 0: 2601 assert ages_capital == future_capital 2602 elif ages_rest < 0: 2603 assert -ages_capital == future_capital 2604 elif ages_rest > 0: 2605 assert ages_capital == ages_rest + future_capital 2606 self.reset() 2607 assert len(self._vault['history']) == 0 2608 2609 assert self._history() 2610 assert self._history(False) is False 2611 assert self._history() is False 2612 assert self._history(True) 2613 assert self._history() 2614 if debug: 2615 print('####################################################################') 2616 2617 transaction = [ 2618 ( 2619 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2620 2000, 2000, 2000, 1, 1, 2621 ), 2622 ( 2623 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2624 75000, 75000, 75000, 1, 1, 2625 ), 2626 ( 2627 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2628 60000, 60000, 60000, 1, 1, 2629 ), 2630 ] 2631 for z in transaction: 2632 self.lock() 2633 x = z[1] 2634 y = z[2] 2635 self.transfer( 2636 unscaled_amount=z[0], 2637 from_account=x, 2638 to_account=y, 2639 desc='test-transfer', 2640 debug=debug, 2641 ) 2642 zz = self.balance(x) 2643 if debug: 2644 print(zz, z) 2645 assert zz == z[3] 2646 xx = self.accounts()[x] 2647 assert xx == z[3] 2648 assert self.balance(x, False) == z[4] 2649 assert xx == z[4] 2650 2651 s = 0 2652 log = self._vault['account'][x]['log'] 2653 for i in log: 2654 s += log[i]['value'] 2655 if debug: 2656 print('s', s, 'z[5]', z[5]) 2657 assert s == z[5] 2658 2659 assert self.box_size(x) == z[6] 2660 assert self.log_size(x) == z[7] 2661 2662 yy = self.accounts()[y] 2663 assert self.balance(y) == z[8] 2664 assert yy == z[8] 2665 assert self.balance(y, False) == z[9] 2666 assert yy == z[9] 2667 2668 s = 0 2669 log = self._vault['account'][y]['log'] 2670 for i in log: 2671 s += log[i]['value'] 2672 assert s == z[10] 2673 2674 assert self.box_size(y) == z[11] 2675 assert self.log_size(y) == z[12] 2676 assert self.free(self.lock()) 2677 2678 if debug: 2679 pp().pprint(self.check(2.17)) 2680 2681 assert not self.nolock() 2682 history_count = len(self._vault['history']) 2683 if debug: 2684 print('history-count', history_count) 2685 assert history_count == 4 2686 assert not self.free(ZakatTracker.time()) 2687 assert self.free(self.lock()) 2688 assert self.nolock() 2689 assert len(self._vault['history']) == 3 2690 2691 # storage 2692 2693 _path = self.path(f'test.{self.ext()}') 2694 if os.path.exists(_path): 2695 os.remove(_path) 2696 self.save() 2697 assert os.path.getsize(_path) > 0 2698 self.reset() 2699 assert self.recall(False, debug) is False 2700 self.load() 2701 assert self._vault['account'] is not None 2702 2703 # recall 2704 2705 assert self.nolock() 2706 assert len(self._vault['history']) == 3 2707 assert self.recall(False, debug) is True 2708 assert len(self._vault['history']) == 2 2709 assert self.recall(False, debug) is True 2710 assert len(self._vault['history']) == 1 2711 assert self.recall(False, debug) is True 2712 assert len(self._vault['history']) == 0 2713 assert self.recall(False, debug) is False 2714 assert len(self._vault['history']) == 0 2715 2716 # exchange 2717 2718 self.exchange("cash", 25, 3.75, "2024-06-25") 2719 self.exchange("cash", 22, 3.73, "2024-06-22") 2720 self.exchange("cash", 15, 3.69, "2024-06-15") 2721 self.exchange("cash", 10, 3.66) 2722 2723 for i in range(1, 30): 2724 exchange = self.exchange("cash", i) 2725 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2726 if debug: 2727 print(i, rate, description, created) 2728 assert created 2729 if i < 10: 2730 assert rate == 1 2731 assert description is None 2732 elif i == 10: 2733 assert rate == 3.66 2734 assert description is None 2735 elif i < 15: 2736 assert rate == 3.66 2737 assert description is None 2738 elif i == 15: 2739 assert rate == 3.69 2740 assert description is not None 2741 elif i < 22: 2742 assert rate == 3.69 2743 assert description is not None 2744 elif i == 22: 2745 assert rate == 3.73 2746 assert description is not None 2747 elif i >= 25: 2748 assert rate == 3.75 2749 assert description is not None 2750 exchange = self.exchange("bank", i) 2751 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2752 if debug: 2753 print(i, rate, description, created) 2754 assert created 2755 assert rate == 1 2756 assert description is None 2757 2758 assert len(self._vault['exchange']) > 0 2759 assert len(self.exchanges()) > 0 2760 self._vault['exchange'].clear() 2761 assert len(self._vault['exchange']) == 0 2762 assert len(self.exchanges()) == 0 2763 2764 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2765 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2766 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2767 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2768 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2769 2770 for i in [x * 0.12 for x in range(-15, 21)]: 2771 if i <= 0: 2772 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2773 else: 2774 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2775 2776 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2777 for i in range(1, 31): 2778 timestamp_ns = ZakatTracker.day_to_time(i) 2779 exchange = self.exchange("cash", timestamp_ns) 2780 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2781 if debug: 2782 print(i, rate, description, created) 2783 assert created 2784 if i < 10: 2785 assert rate == 1 2786 assert description is None 2787 elif i == 10: 2788 assert rate == 3.66 2789 assert description is None 2790 elif i < 15: 2791 assert rate == 3.66 2792 assert description is None 2793 elif i == 15: 2794 assert rate == 3.69 2795 assert description is not None 2796 elif i < 22: 2797 assert rate == 3.69 2798 assert description is not None 2799 elif i == 22: 2800 assert rate == 3.73 2801 assert description is not None 2802 elif i >= 25: 2803 assert rate == 3.75 2804 assert description is not None 2805 exchange = self.exchange("bank", i) 2806 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2807 if debug: 2808 print(i, rate, description, created) 2809 assert created 2810 assert rate == 1 2811 assert description is None 2812 2813 # csv 2814 2815 csv_count = 1000 2816 2817 for with_rate, path in { 2818 False: 'test-import_csv-no-exchange', 2819 True: 'test-import_csv-with-exchange', 2820 }.items(): 2821 2822 if debug: 2823 print('test_import_csv', with_rate, path) 2824 2825 csv_path = path + '.csv' 2826 if os.path.exists(csv_path): 2827 os.remove(csv_path) 2828 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2829 if debug: 2830 print('generate_random_csv_file', c) 2831 assert c == csv_count 2832 assert os.path.getsize(csv_path) > 0 2833 cache_path = self.import_csv_cache_path() 2834 if os.path.exists(cache_path): 2835 os.remove(cache_path) 2836 self.reset() 2837 (created, found, bad) = self.import_csv(csv_path, debug) 2838 bad_count = len(bad) 2839 assert bad_count > 0 2840 if debug: 2841 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2842 print('bad', bad) 2843 tmp_size = os.path.getsize(cache_path) 2844 assert tmp_size > 0 2845 # TODO: assert created + found + bad_count == csv_count 2846 # TODO: assert created == csv_count 2847 # TODO: assert bad_count == 0 2848 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2849 bad_2_count = len(bad_2) 2850 if debug: 2851 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2852 print('bad', bad) 2853 assert bad_2_count > 0 2854 # TODO: assert tmp_size == os.path.getsize(cache_path) 2855 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2856 # TODO: assert created == found_2 2857 # TODO: assert bad_count == bad_2_count 2858 # TODO: assert found_2 == csv_count 2859 # TODO: assert bad_2_count == 0 2860 # TODO: assert created_2 == 0 2861 2862 # payment parts 2863 2864 positive_parts = self.build_payment_parts(100, positive_only=True) 2865 assert self.check_payment_parts(positive_parts) != 0 2866 assert self.check_payment_parts(positive_parts) != 0 2867 all_parts = self.build_payment_parts(300, positive_only=False) 2868 assert self.check_payment_parts(all_parts) != 0 2869 assert self.check_payment_parts(all_parts) != 0 2870 if debug: 2871 pp().pprint(positive_parts) 2872 pp().pprint(all_parts) 2873 # dynamic discount 2874 suite = [] 2875 count = 3 2876 for exceed in [False, True]: 2877 case = [] 2878 for parts in [positive_parts, all_parts]: 2879 part = parts.copy() 2880 demand = part['demand'] 2881 if debug: 2882 print(demand, part['total']) 2883 i = 0 2884 z = demand / count 2885 cp = { 2886 'account': {}, 2887 'demand': demand, 2888 'exceed': exceed, 2889 'total': part['total'], 2890 } 2891 j = '' 2892 for x, y in part['account'].items(): 2893 x_exchange = self.exchange(x) 2894 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2895 if exceed and zz <= demand: 2896 i += 1 2897 y['part'] = zz 2898 if debug: 2899 print(exceed, y) 2900 cp['account'][x] = y 2901 case.append(y) 2902 elif not exceed and y['balance'] >= zz: 2903 i += 1 2904 y['part'] = zz 2905 if debug: 2906 print(exceed, y) 2907 cp['account'][x] = y 2908 case.append(y) 2909 j = x 2910 if i >= count: 2911 break 2912 if len(cp['account'][j]) > 0: 2913 suite.append(cp) 2914 if debug: 2915 print('suite', len(suite)) 2916 # vault = self._vault.copy() 2917 for case in suite: 2918 # self._vault = vault.copy() 2919 if debug: 2920 print('case', case) 2921 result = self.check_payment_parts(case) 2922 if debug: 2923 print('check_payment_parts', result, f'exceed: {exceed}') 2924 assert result == 0 2925 2926 report = self.check(2.17, None, debug) 2927 (valid, brief, plan) = report 2928 if debug: 2929 print('valid', valid) 2930 zakat_result = self.zakat(report, parts=case, debug=debug) 2931 if debug: 2932 print('zakat-result', zakat_result) 2933 assert valid == zakat_result 2934 2935 assert self.save(path + f'.{self.ext()}') 2936 assert self.export_json(path + '.json') 2937 2938 assert self.export_json("1000-transactions-test.json") 2939 assert self.save(f"1000-transactions-test.{self.ext()}") 2940 2941 self.reset() 2942 2943 # test transfer between accounts with different exchange rate 2944 2945 a_SAR = "Bank (SAR)" 2946 b_USD = "Bank (USD)" 2947 c_SAR = "Safe (SAR)" 2948 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2949 for case in [ 2950 (0, a_SAR, "SAR Gift", 1000, 100000), 2951 (1, a_SAR, 1), 2952 (0, b_USD, "USD Gift", 500, 50000), 2953 (1, b_USD, 1), 2954 (2, b_USD, 3.75), 2955 (1, b_USD, 3.75), 2956 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2957 (0, c_SAR, "Salary", 750, 75000), 2958 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2959 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2960 ]: 2961 if debug: 2962 print('case', case) 2963 match (case[0]): 2964 case 0: # track 2965 _, account, desc, x, balance = case 2966 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2967 2968 cached_value = self.balance(account, cached=True) 2969 fresh_value = self.balance(account, cached=False) 2970 if debug: 2971 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2972 assert cached_value == balance 2973 assert fresh_value == balance 2974 case 1: # check-exchange 2975 _, account, expected_rate = case 2976 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2977 if debug: 2978 print('t-exchange', t_exchange) 2979 assert t_exchange['rate'] == expected_rate 2980 case 2: # do-exchange 2981 _, account, rate = case 2982 self.exchange(account, rate=rate, debug=debug) 2983 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2984 if debug: 2985 print('b-exchange', b_exchange) 2986 assert b_exchange['rate'] == rate 2987 case 3: # transfer 2988 _, x, a, b, desc, a_balance, b_balance = case 2989 self.transfer(x, a, b, desc, debug=debug) 2990 2991 cached_value = self.balance(a, cached=True) 2992 fresh_value = self.balance(a, cached=False) 2993 if debug: 2994 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 2995 assert cached_value == a_balance 2996 assert fresh_value == a_balance 2997 2998 cached_value = self.balance(b, cached=True) 2999 fresh_value = self.balance(b, cached=False) 3000 if debug: 3001 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3002 assert cached_value == b_balance 3003 assert fresh_value == b_balance 3004 3005 # Transfer all in many chunks randomly from B to A 3006 a_SAR_balance = 137125 3007 b_USD_balance = 50100 3008 b_USD_exchange = self.exchange(b_USD) 3009 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3010 if debug: 3011 print('amounts', amounts) 3012 i = 0 3013 for x in amounts: 3014 if debug: 3015 print(f'{i} - transfer-with-exchange({x})') 3016 self.transfer( 3017 unscaled_amount=self.unscale(x), 3018 from_account=b_USD, 3019 to_account=a_SAR, 3020 desc=f"{x} USD -> SAR", 3021 debug=debug, 3022 ) 3023 3024 b_USD_balance -= x 3025 cached_value = self.balance(b_USD, cached=True) 3026 fresh_value = self.balance(b_USD, cached=False) 3027 if debug: 3028 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3029 b_USD_balance) 3030 assert cached_value == b_USD_balance 3031 assert fresh_value == b_USD_balance 3032 3033 a_SAR_balance += int(x * b_USD_exchange['rate']) 3034 cached_value = self.balance(a_SAR, cached=True) 3035 fresh_value = self.balance(a_SAR, cached=False) 3036 if debug: 3037 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3038 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3039 assert cached_value == a_SAR_balance 3040 assert fresh_value == a_SAR_balance 3041 i += 1 3042 3043 # Transfer all in many chunks randomly from C to A 3044 c_SAR_balance = 37500 3045 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3046 if debug: 3047 print('amounts', amounts) 3048 i = 0 3049 for x in amounts: 3050 if debug: 3051 print(f'{i} - transfer-with-exchange({x})') 3052 self.transfer( 3053 unscaled_amount=self.unscale(x), 3054 from_account=c_SAR, 3055 to_account=a_SAR, 3056 desc=f"{x} SAR -> a_SAR", 3057 debug=debug, 3058 ) 3059 3060 c_SAR_balance -= x 3061 cached_value = self.balance(c_SAR, cached=True) 3062 fresh_value = self.balance(c_SAR, cached=False) 3063 if debug: 3064 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3065 c_SAR_balance) 3066 assert cached_value == c_SAR_balance 3067 assert fresh_value == c_SAR_balance 3068 3069 a_SAR_balance += x 3070 cached_value = self.balance(a_SAR, cached=True) 3071 fresh_value = self.balance(a_SAR, cached=False) 3072 if debug: 3073 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3074 a_SAR_balance) 3075 assert cached_value == a_SAR_balance 3076 assert fresh_value == a_SAR_balance 3077 i += 1 3078 3079 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3080 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3081 3082 # check & zakat with exchange rates for many cycles 3083 3084 for rate, values in { 3085 1: { 3086 'in': [1000, 2000, 10000], 3087 'exchanged': [100000, 200000, 1000000], 3088 'out': [2500, 5000, 73140], 3089 }, 3090 3.75: { 3091 'in': [200, 1000, 5000], 3092 'exchanged': [75000, 375000, 1875000], 3093 'out': [1875, 9375, 137138], 3094 }, 3095 }.items(): 3096 a, b, c = values['in'] 3097 m, n, o = values['exchanged'] 3098 x, y, z = values['out'] 3099 if debug: 3100 print('rate', rate, 'values', values) 3101 for case in [ 3102 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3103 {'safe': {0: {'below_nisab': x}}}, 3104 ], False, m), 3105 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3106 {'safe': {0: {'count': 1, 'total': y}}}, 3107 ], True, n), 3108 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3109 {'cave': {0: {'count': 3, 'total': z}}}, 3110 ], True, o), 3111 ]: 3112 if debug: 3113 print(f"############# check(rate: {rate}) #############") 3114 print('case', case) 3115 self.reset() 3116 self.exchange(account=case[1], created=case[2], rate=rate) 3117 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3118 assert self.snapshot() 3119 3120 # assert self.nolock() 3121 # history_size = len(self._vault['history']) 3122 # print('history_size', history_size) 3123 # assert history_size == 2 3124 assert self.lock() 3125 assert not self.nolock() 3126 report = self.check(2.17, None, debug) 3127 (valid, brief, plan) = report 3128 if debug: 3129 print('brief', brief) 3130 assert valid == case[4] 3131 assert case[5] == brief[0] 3132 assert case[5] == brief[1] 3133 3134 if debug: 3135 pp().pprint(plan) 3136 3137 for x in plan: 3138 assert case[1] == x 3139 if 'total' in case[3][0][x][0].keys(): 3140 assert case[3][0][x][0]['total'] == int(brief[2]) 3141 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3142 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3143 else: 3144 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3145 if debug: 3146 pp().pprint(report) 3147 result = self.zakat(report, debug=debug) 3148 if debug: 3149 print('zakat-result', result, case[4]) 3150 assert result == case[4] 3151 report = self.check(2.17, None, debug) 3152 (valid, brief, plan) = report 3153 assert valid is False 3154 3155 history_size = len(self._vault['history']) 3156 if debug: 3157 print('history_size', history_size) 3158 assert history_size == 3 3159 assert not self.nolock() 3160 assert self.recall(False, debug) is False 3161 self.free(self.lock()) 3162 assert self.nolock() 3163 3164 for i in range(3, 0, -1): 3165 history_size = len(self._vault['history']) 3166 if debug: 3167 print('history_size', history_size) 3168 assert history_size == i 3169 assert self.recall(False, debug) is True 3170 3171 assert self.nolock() 3172 assert self.recall(False, debug) is False 3173 3174 history_size = len(self._vault['history']) 3175 if debug: 3176 print('history_size', history_size) 3177 assert history_size == 0 3178 3179 account_size = len(self._vault['account']) 3180 if debug: 3181 print('account_size', account_size) 3182 assert account_size == 0 3183 3184 report_size = len(self._vault['report']) 3185 if debug: 3186 print('report_size', report_size) 3187 assert report_size == 0 3188 3189 assert self.nolock() 3190 return True 3191 except: 3192 # pp().pprint(self._vault) 3193 assert self.export_json("test-snapshot.json") 3194 assert self.save(f"test-snapshot.{self.ext()}") 3195 raise
3198def test(debug: bool = False): 3199 ledger = ZakatTracker() 3200 start = ZakatTracker.time() 3201 assert ledger.test(debug=debug) 3202 if debug: 3203 print("#########################") 3204 print("######## TEST DONE ########") 3205 print("#########################") 3206 print(ZakatTracker.duration_from_nanoseconds(ZakatTracker.time() - start)) 3207 print("#########################")
77class Action(Enum): 78 CREATE = auto() 79 TRACK = auto() 80 LOG = auto() 81 SUB = auto() 82 ADD_FILE = auto() 83 REMOVE_FILE = auto() 84 BOX_TRANSFER = auto() 85 EXCHANGE = auto() 86 REPORT = auto() 87 ZAKAT = auto()
90class JSONEncoder(json.JSONEncoder): 91 def default(self, obj): 92 if isinstance(obj, Action) or isinstance(obj, MathOperation): 93 return obj.name # Serialize as the enum member's name 94 elif isinstance(obj, Decimal): 95 return float(obj) 96 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
91 def default(self, obj): 92 if isinstance(obj, Action) or isinstance(obj, MathOperation): 93 return obj.name # Serialize as the enum member's name 94 elif isinstance(obj, Decimal): 95 return float(obj) 96 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
55def start_file_server(database_path: str, database_callback: callable = None, csv_callback: callable = None, 56 debug: bool = False) -> tuple: 57 """ 58 Starts a multi-purpose HTTP server to manage file interactions for a Zakat application. 59 60 This server facilitates the following functionalities: 61 62 1. GET /{file_uuid}/get: Download the database file specified by `database_path`. 63 2. GET /{file_uuid}/upload: Display an HTML form for uploading files. 64 3. POST /{file_uuid}/upload: Handle file uploads, distinguishing between: 65 - Database File (.db): Replaces the existing database with the uploaded one. 66 - CSV File (.csv): Imports data from the CSV into the existing database. 67 68 Args: 69 database_path (str): The path to the pickle database file. 70 database_callback (callable, optional): A function to call after a successful database upload. 71 It receives the uploaded database path as its argument. 72 csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, 73 the database path, and the debug flag as its arguments. 74 debug (bool, optional): If True, print debugging information. Defaults to False. 75 76 Returns: 77 Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: 78 - file_name (str): The name of the database file. 79 - download_url (str): The URL to download the database file. 80 - upload_url (str): The URL to access the file upload form. 81 - server_thread (threading.Thread): The thread running the server. 82 - shutdown_server (Callable[[], None]): A function to gracefully shut down the server. 83 84 Example: 85 _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") 86 print(f"Download database: {download_url}") 87 print(f"Upload files: {upload_url}") 88 server_thread.start() 89 # ... later ... 90 shutdown_server() 91 """ 92 file_uuid = uuid.uuid4() 93 file_name = os.path.basename(database_path) 94 95 port = find_available_port() 96 download_url = f"http://localhost:{port}/{file_uuid}/get" 97 upload_url = f"http://localhost:{port}/{file_uuid}/upload" 98 99 class Handler(http.server.SimpleHTTPRequestHandler): 100 def do_GET(self): 101 if self.path == f"/{file_uuid}/get": 102 # GET: Serve the existing file 103 try: 104 with open(database_path, "rb") as f: 105 self.send_response(200) 106 self.send_header("Content-type", "application/octet-stream") 107 self.send_header("Content-Disposition", f'attachment; filename="{file_name}"') 108 self.end_headers() 109 self.wfile.write(f.read()) 110 except FileNotFoundError: 111 self.send_error(404, "File not found") 112 elif self.path == f"/{file_uuid}/upload": 113 # GET: Serve the upload form 114 self.send_response(200) 115 self.send_header("Content-type", "text/html") 116 self.end_headers() 117 self.wfile.write(f""" 118 <html lang="en"> 119 <head> 120 <title>Zakat File Server</title> 121 </head> 122 <body> 123 <h1>Zakat File Server</h1> 124 <h3>You can download the <a target="__blank" href="{download_url}">database file</a>...</h3> 125 <h3>Or upload a new file to restore a database or import `CSV` file:</h3> 126 <form action="/{file_uuid}/upload" method="post" enctype="multipart/form-data"> 127 <input type="file" name="file" required><br/> 128 <input type="radio" id="{FileType.Database.value}" name="upload_type" value="{FileType.Database.value}" required> 129 <label for="database">Database File</label><br/> 130 <input type="radio"id="{FileType.CSV.value}" name="upload_type" value="{FileType.CSV.value}"> 131 <label for="csv">CSV File</label><br/> 132 <input type="submit" value="Upload"><br/> 133 </form> 134 </body></html> 135 """.encode()) 136 else: 137 self.send_error(404) 138 139 def do_POST(self): 140 if self.path == f"/{file_uuid}/upload": 141 # POST: Handle request 142 # 1. Get the Form Data 143 form_data = cgi.FieldStorage( 144 fp=self.rfile, 145 headers=self.headers, 146 environ={'REQUEST_METHOD': 'POST'} 147 ) 148 upload_type = form_data.getvalue("upload_type") 149 150 if debug: 151 print('upload_type', upload_type) 152 153 if upload_type not in [FileType.Database.value, FileType.CSV.value]: 154 self.send_error(400, "Invalid upload type") 155 return 156 157 # 2. Extract File Data 158 file_item = form_data['file'] # Assuming 'file' is your file input name 159 160 # 3. Get File Details 161 filename = file_item.filename 162 file_data = file_item.file.read() # Read the file's content 163 164 if debug: 165 print(f'Uploaded filename: {filename}') 166 167 # 4. Define Storage Path for CSV 168 upload_directory = "./uploads" # Create this directory if it doesn't exist 169 os.makedirs(upload_directory, exist_ok=True) 170 file_path = os.path.join(upload_directory, upload_type) 171 172 # 5. Write to Disk 173 with open(file_path, 'wb') as f: 174 f.write(file_data) 175 176 match upload_type: 177 case FileType.Database.value: 178 179 try: 180 # 6. Verify database file 181 # ZakatTracker(db_path=file_path) # FATAL, Circular Imports Error 182 if database_callback is not None: 183 database_callback(file_path) 184 185 # 7. Copy database into the original path 186 shutil.copy2(file_path, database_path) 187 except Exception as e: 188 self.send_error(400, str(e)) 189 return 190 191 case FileType.CSV.value: 192 # 6. Verify CSV file 193 try: 194 # x = ZakatTracker(db_path=database_path) # FATAL, Circular Imports Error 195 # result = x.import_csv(file_path, debug=debug) 196 if csv_callback is not None: 197 result = csv_callback(file_path, database_path, debug) 198 if debug: 199 print(f'CSV imported: {result}') 200 if len(result[2]) != 0: 201 self.send_response(200) 202 self.end_headers() 203 self.wfile.write(json.dumps(result).encode()) 204 return 205 except Exception as e: 206 self.send_error(400, str(e)) 207 return 208 209 self.send_response(200) 210 self.end_headers() 211 self.wfile.write(b"File uploaded successfully.") 212 213 httpd = socketserver.TCPServer(("localhost", port), Handler) 214 server_thread = threading.Thread(target=httpd.serve_forever) 215 216 def shutdown_server(): 217 nonlocal httpd, server_thread 218 httpd.shutdown() 219 httpd.server_close() # Close the socket 220 server_thread.join() # Wait for the thread to finish 221 222 return file_name, download_url, upload_url, server_thread, shutdown_server
Starts a multi-purpose HTTP server to manage file interactions for a Zakat application.
This server facilitates the following functionalities:
- GET /{file_uuid}/get: Download the database file specified by
database_path
. - GET /{file_uuid}/upload: Display an HTML form for uploading files.
- POST /{file_uuid}/upload: Handle file uploads, distinguishing between:
- Database File (.db): Replaces the existing database with the uploaded one.
- CSV File (.csv): Imports data from the CSV into the existing database.
Args: database_path (str): The path to the pickle database file. database_callback (callable, optional): A function to call after a successful database upload. It receives the uploaded database path as its argument. csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, the database path, and the debug flag as its arguments. debug (bool, optional): If True, print debugging information. Defaults to False.
Returns: Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: - file_name (str): The name of the database file. - download_url (str): The URL to download the database file. - upload_url (str): The URL to access the file upload form. - server_thread (threading.Thread): The thread running the server. - shutdown_server (Callable[[], None]): A function to gracefully shut down the server.
Example: _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") print(f"Download database: {download_url}") print(f"Upload files: {upload_url}") server_thread.start() # ... later ... shutdown_server()
32def find_available_port() -> int: 33 """ 34 Finds and returns an available TCP port on the local machine. 35 36 This function utilizes a TCP server socket to bind to port 0, which 37 instructs the operating system to automatically assign an available 38 port. The assigned port is then extracted and returned. 39 40 Returns: 41 int: The available TCP port number. 42 43 Raises: 44 OSError: If an error occurs during the port binding process, such 45 as all ports being in use. 46 47 Example: 48 port = find_available_port() 49 print(f"Available port: {port}") 50 """ 51 with socketserver.TCPServer(("localhost", 0), None) as s: 52 return s.server_address[1]
Finds and returns an available TCP port on the local machine.
This function utilizes a TCP server socket to bind to port 0, which instructs the operating system to automatically assign an available port. The assigned port is then extracted and returned.
Returns: int: The available TCP port number.
Raises: OSError: If an error occurs during the port binding process, such as all ports being in use.
Example: port = find_available_port() print(f"Available port: {port}")