zakat
xxx
_____ _ _ _ _ _
|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _
/ // _| |/ / _
| __| | | | | '_ \| '__/ _` | '__| | | |
/ /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| |
/______,_|_|___,_|__| |_____|_|_.__/|_| __,_|_| __, |
|___/
"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف ... Never Trust, Always Verify ...
This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat.
1""" 2 _____ _ _ _ _ _ 3|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _ 4 / // _` | |/ / _` | __| | | | | '_ \| '__/ _` | '__| | | | 5 / /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| | 6/____\__,_|_|\_\__,_|\__| |_____|_|_.__/|_| \__,_|_| \__, | 7 |___/ 8 9"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف 10... Never Trust, Always Verify ... 11 12This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat. 13""" 14# Importing necessary classes and functions from the main module 15from zakat.zakat_tracker import ( 16 ZakatTracker, 17 test, 18 Action, 19 JSONEncoder, 20 MathOperation, 21) 22 23from zakat.file_server import ( 24 start_file_server, 25 find_available_port, 26 FileType, 27) 28 29# Version information for the module 30__version__ = ZakatTracker.Version() 31__all__ = [ 32 "ZakatTracker", 33 "test", 34 "Action", 35 "JSONEncoder", 36 "MathOperation", 37 "start_file_server", 38 "find_available_port", 39 "FileType", 40]
123class ZakatTracker: 124 """ 125 A class for tracking and calculating Zakat. 126 127 This class provides functionalities for recording transactions, calculating Zakat due, 128 and managing account balances. It also offers features like importing transactions from 129 CSV files, exporting data to JSON format, and saving/loading the tracker state. 130 131 The `ZakatTracker` class is designed to handle both positive and negative transactions, 132 allowing for flexible tracking of financial activities related to Zakat. It also supports 133 the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due 134 based on the current silver price. 135 136 The class uses a camel file as its database to persist the tracker state, 137 ensuring data integrity across sessions. It also provides options for enabling or 138 disabling history tracking, allowing users to choose their preferred level of detail. 139 140 In addition, the `ZakatTracker` class includes various helper methods like 141 `time`, `time_to_datetime`, `lock`, `free`, `recall`, `export_json`, 142 and more. These methods provide additional functionalities and flexibility 143 for interacting with and managing the Zakat tracker. 144 145 Attributes: 146 ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. 147 ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. 148 ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. 149 ZakatTracker.Version (function): The version of the ZakatTracker class. 150 151 Data Structure: 152 The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data. 153 154 _vault (dict): 155 - account (dict): 156 - {account_number} (dict): 157 - balance (int): The current balance of the account. 158 - box (dict): A dictionary storing transaction details. 159 - {timestamp} (dict): 160 - capital (int): The initial amount of the transaction. 161 - count (int): The number of times Zakat has been calculated for this transaction. 162 - last (int): The timestamp of the last Zakat calculation. 163 - rest (int): The remaining amount after Zakat deductions and withdrawal. 164 - total (int): The total Zakat deducted from this transaction. 165 - count (int): The total number of transactions for the account. 166 - log (dict): A dictionary storing transaction logs. 167 - {timestamp} (dict): 168 - value (int): The transaction amount (positive or negative). 169 - desc (str): The description of the transaction. 170 - ref (int): The box reference (positive or None). 171 - file (dict): A dictionary storing file references associated with the transaction. 172 - hide (bool): Indicates whether the account is hidden or not. 173 - zakatable (bool): Indicates whether the account is subject to Zakat. 174 - exchange (dict): 175 - account (dict): 176 - {timestamps} (dict): 177 - rate (float): Exchange rate when compared to local currency. 178 - description (str): The description of the exchange rate. 179 - history (dict): 180 - {timestamp} (list): A list of dictionaries storing the history of actions performed. 181 - {action_dict} (dict): 182 - action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT). 183 - account (str): The account number associated with the action. 184 - ref (int): The reference number of the transaction. 185 - file (int): The reference number of the file (if applicable). 186 - key (str): The key associated with the action (e.g., 'rest', 'total'). 187 - value (int): The value associated with the action. 188 - math (MathOperation): The mathematical operation performed (if applicable). 189 - lock (int or None): The timestamp indicating the current lock status (None if not locked). 190 - report (dict): 191 - {timestamp} (tuple): A tuple storing Zakat report details. 192 193 """ 194 195 @staticmethod 196 def Version() -> str: 197 """ 198 Returns the current version of the software. 199 200 This function returns a string representing the current version of the software, 201 including major, minor, and patch version numbers in the format "X.Y.Z". 202 203 Returns: 204 str: The current version of the software. 205 """ 206 return '0.2.82' 207 208 @staticmethod 209 def ZakatCut(x: float) -> float: 210 """ 211 Calculates the Zakat amount due on an asset. 212 213 This function calculates the zakat amount due on a given asset value over one lunar year. 214 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 215 that exceeds a certain threshold (Nisab). 216 217 Parameters: 218 x: The total value of the asset on which Zakat is to be calculated. 219 220 Returns: 221 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 222 """ 223 return 0.025 * x # Zakat Cut in one Lunar Year 224 225 @staticmethod 226 def TimeCycle(days: int = 355) -> int: 227 """ 228 Calculates the approximate duration of a lunar year in nanoseconds. 229 230 This function calculates the approximate duration of a lunar year based on the given number of days. 231 It converts the given number of days into nanoseconds for use in high-precision timing applications. 232 233 Parameters: 234 days: The number of days in a lunar year. Defaults to 355, 235 which is an approximation of the average length of a lunar year. 236 237 Returns: 238 The approximate duration of a lunar year in nanoseconds. 239 """ 240 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds 241 242 @staticmethod 243 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 244 """ 245 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 246 247 This function calculates the Nisab value, which is the minimum threshold of wealth, 248 that makes an individual liable for paying Zakat. 249 The Nisab value is determined by the equivalent value of a specific amount 250 of gold or silver (currently 595 grams in silver) in the local currency. 251 252 Parameters: 253 - gram_price (float): The price per gram of Nisab. 254 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 255 256 Returns: 257 - float: The total value of Nisab based on the given price per gram. 258 """ 259 return gram_price * gram_quantity 260 261 @staticmethod 262 def ext() -> str: 263 """ 264 Returns the file extension used by the ZakatTracker class. 265 266 Returns: 267 str: The file extension used by the ZakatTracker class, which is 'camel'. 268 """ 269 return 'camel' 270 271 def __init__(self, db_path: str = "zakat.camel", history_mode: bool = True): 272 """ 273 Initialize ZakatTracker with database path and history mode. 274 275 Parameters: 276 db_path (str): The path to the database file. Default is "zakat.camel". 277 history_mode (bool): The mode for tracking history. Default is True. 278 279 Returns: 280 None 281 """ 282 self._base_path = None 283 self._vault_path = None 284 self._vault = None 285 self.reset() 286 self._history(history_mode) 287 self.path(db_path) 288 self.load() 289 290 def path(self, path: str = None) -> str: 291 """ 292 Set or get the path to the database file. 293 294 If no path is provided, the current path is returned. 295 If a path is provided, it is set as the new path. 296 The function also creates the necessary directories if the provided path is a file. 297 298 Parameters: 299 path (str): The new path to the database file. If not provided, the current path is returned. 300 301 Returns: 302 str: The current or new path to the database file. 303 """ 304 if path is None: 305 return self._vault_path 306 self._vault_path = Path(path).resolve() 307 base_path = Path(path).resolve() 308 if base_path.is_file() or base_path.suffix: 309 base_path = base_path.parent 310 base_path.mkdir(parents=True, exist_ok=True) 311 self._base_path = base_path 312 return str(self._vault_path) 313 314 def base_path(self, *args) -> str: 315 """ 316 Generate a base path by joining the provided arguments with the existing base path. 317 318 Parameters: 319 *args (str): Variable length argument list of strings to be joined with the base path. 320 321 Returns: 322 str: The generated base path. If no arguments are provided, the existing base path is returned. 323 """ 324 if not args: 325 return str(self._base_path) 326 filtered_args = [] 327 ignored_filename = None 328 for arg in args: 329 if Path(arg).suffix: 330 ignored_filename = arg 331 else: 332 filtered_args.append(arg) 333 base_path = Path(self._base_path) 334 full_path = base_path.joinpath(*filtered_args) 335 full_path.mkdir(parents=True, exist_ok=True) 336 if ignored_filename is not None: 337 return full_path.resolve() / ignored_filename # Join with the ignored filename 338 return str(full_path.resolve()) 339 340 @staticmethod 341 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 342 """ 343 Scales a numerical value by a specified power of 10, returning an integer. 344 345 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 346 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 347 348 Parameters: 349 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 350 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 351 by a factor of 100 (e.g., converts 1.23 to 123). 352 353 Returns: 354 The scaled value, rounded to the nearest integer. 355 356 Raises: 357 TypeError: If the input `x` is not a valid numeric type. 358 359 Examples: 360 >>> ZakatTracker.scale(3.14159) 361 314 362 >>> ZakatTracker.scale(1234, decimal_places=3) 363 1234000 364 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 365 50 366 """ 367 if not isinstance(x, (float, int, Decimal)): 368 raise TypeError("Input 'x' must be a float, int, or Decimal.") 369 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places)) 370 371 @staticmethod 372 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 373 """ 374 Unscales an integer by a power of 10. 375 376 Parameters: 377 x: The integer to unscale. 378 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 379 decimal_places: The power of 10 to use. Defaults to 2. 380 381 Returns: 382 The unscaled number, converted to the specified return_type. 383 384 Raises: 385 TypeError: If the return_type is not float or Decimal. 386 """ 387 if return_type not in (float, Decimal): 388 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 389 return round(return_type(x / (10 ** decimal_places)), decimal_places) 390 391 def _history(self, status: bool = None) -> bool: 392 """ 393 Enable or disable history tracking. 394 395 Parameters: 396 status (bool): The status of history tracking. Default is True. 397 398 Returns: 399 None 400 """ 401 if status is not None: 402 self._history_mode = status 403 return self._history_mode 404 405 def reset(self) -> None: 406 """ 407 Reset the internal data structure to its initial state. 408 409 Parameters: 410 None 411 412 Returns: 413 None 414 """ 415 self._vault = { 416 'account': {}, 417 'exchange': {}, 418 'history': {}, 419 'lock': None, 420 'report': {}, 421 } 422 423 @staticmethod 424 def time(now: datetime = None) -> int: 425 """ 426 Generates a timestamp based on the provided datetime object or the current datetime. 427 428 Parameters: 429 now (datetime, optional): The datetime object to generate the timestamp from. 430 If not provided, the current datetime is used. 431 432 Returns: 433 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 434 before 1970 will return in negative until 1000AD. 435 """ 436 if now is None: 437 now = datetime.datetime.now() 438 ordinal_day = now.toordinal() 439 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 440 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day) 441 442 @staticmethod 443 def time_to_datetime(ordinal_ns: int) -> datetime: 444 """ 445 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 446 447 Parameters: 448 ordinal_ns (int): The ordinal number of days since 1000-01-01. 449 450 Returns: 451 datetime: The corresponding datetime object. 452 """ 453 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 454 ns_in_day = ordinal_ns % 86_400_000_000_000 455 d = datetime.datetime.fromordinal(ordinal_day) 456 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 457 return datetime.datetime.combine(d, datetime.time()) + t 458 459 def clean_history(self, lock: int | None = None) -> int: 460 """ 461 Cleans up the history of actions performed on the ZakatTracker instance. 462 463 Parameters: 464 lock (int, optional): The lock ID is used to clean up the empty history. 465 If not provided, it cleans up the empty history records for all locks. 466 467 Returns: 468 int: The number of locks cleaned up. 469 """ 470 count = 0 471 if lock in self._vault['history']: 472 if len(self._vault['history'][lock]) <= 0: 473 count += 1 474 del self._vault['history'][lock] 475 return count 476 self.free(self.lock()) 477 for lock in self._vault['history']: 478 if len(self._vault['history'][lock]) <= 0: 479 count += 1 480 del self._vault['history'][lock] 481 return count 482 483 def _step(self, action: Action = None, account=None, ref: int = None, file: int = None, value: float = None, 484 key: str = None, math_operation: MathOperation = None) -> int: 485 """ 486 This method is responsible for recording the actions performed on the ZakatTracker. 487 488 Parameters: 489 - action (Action): The type of action performed. 490 - account (str): The account number on which the action was performed. 491 - ref (int): The reference number of the action. 492 - file (int): The file reference number of the action. 493 - value (int): The value associated with the action. 494 - key (str): The key associated with the action. 495 - math_operation (MathOperation): The mathematical operation performed during the action. 496 497 Returns: 498 - int: The lock time of the recorded action. If no lock was performed, it returns 0. 499 """ 500 if not self._history(): 501 return 0 502 lock = self._vault['lock'] 503 if self.nolock(): 504 lock = self._vault['lock'] = self.time() 505 self._vault['history'][lock] = [] 506 if action is None: 507 return lock 508 self._vault['history'][lock].append({ 509 'action': action, 510 'account': account, 511 'ref': ref, 512 'file': file, 513 'key': key, 514 'value': value, 515 'math': math_operation, 516 }) 517 return lock 518 519 def nolock(self) -> bool: 520 """ 521 Check if the vault lock is currently not set. 522 523 Returns: 524 bool: True if the vault lock is not set, False otherwise. 525 """ 526 return self._vault['lock'] is None 527 528 def lock(self) -> int: 529 """ 530 Acquires a lock on the ZakatTracker instance. 531 532 Returns: 533 int: The lock ID. This ID can be used to release the lock later. 534 """ 535 return self._step() 536 537 def vault(self) -> dict: 538 """ 539 Returns a copy of the internal vault dictionary. 540 541 This method is used to retrieve the current state of the ZakatTracker object. 542 It provides a snapshot of the internal data structure, allowing for further 543 processing or analysis. 544 545 Returns: 546 dict: A copy of the internal vault dictionary. 547 """ 548 return self._vault.copy() 549 550 def stats(self) -> dict[str, tuple]: 551 """ 552 Calculates and returns statistics about the object's data storage. 553 554 This method determines the size of the database file on disk and the 555 size of the data currently held in RAM (likely within a dictionary). 556 Both sizes are reported in bytes and in a human-readable format 557 (e.g., KB, MB). 558 559 Returns: 560 dict[str, tuple]: A dictionary containing the following statistics: 561 562 * 'database': A tuple with two elements: 563 - The database file size in bytes (int). 564 - The database file size in human-readable format (str). 565 * 'ram': A tuple with two elements: 566 - The RAM usage (dictionary size) in bytes (int). 567 - The RAM usage in human-readable format (str). 568 569 Example: 570 >>> stats = my_object.stats() 571 >>> print(stats['database']) 572 (256000, '250.0 KB') 573 >>> print(stats['ram']) 574 (12345, '12.1 KB') 575 """ 576 ram_size = self.get_dict_size(self.vault()) 577 file_size = os.path.getsize(self.path()) 578 return { 579 'database': (file_size, self.human_readable_size(file_size)), 580 'ram': (ram_size, self.human_readable_size(ram_size)), 581 } 582 583 def files(self) -> list[dict[str, str | int]]: 584 """ 585 Retrieves information about files associated with this class. 586 587 This class method provides a standardized way to gather details about 588 files used by the class for storage, snapshots, and CSV imports. 589 590 Returns: 591 list[dict[str, str | int]]: A list of dictionaries, each containing information 592 about a specific file: 593 594 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 595 * path (str): The full file path. 596 * exists (bool): Whether the file exists on the filesystem. 597 * size (int): The file size in bytes (0 if the file doesn't exist). 598 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 599 600 Example: 601 ``` 602 file_info = MyClass.files() 603 for info in file_info: 604 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 605 ``` 606 """ 607 result = [] 608 for file_type, path in { 609 'database': self.path(), 610 'snapshot': self.snapshot_cache_path(), 611 'import_csv': self.import_csv_cache_path(), 612 }.items(): 613 exists = os.path.exists(path) 614 size = os.path.getsize(path) if exists else 0 615 human_readable_size = self.human_readable_size(size) if exists else 0 616 result.append({ 617 'type': file_type, 618 'path': path, 619 'exists': exists, 620 'size': size, 621 'human_readable_size': human_readable_size, 622 }) 623 return result 624 625 def steps(self) -> dict: 626 """ 627 Returns a copy of the history of steps taken in the ZakatTracker. 628 629 The history is a dictionary where each key is a unique identifier for a step, 630 and the corresponding value is a dictionary containing information about the step. 631 632 Returns: 633 dict: A copy of the history of steps taken in the ZakatTracker. 634 """ 635 return self._vault['history'].copy() 636 637 def free(self, lock: int, auto_save: bool = True) -> bool: 638 """ 639 Releases the lock on the database. 640 641 Parameters: 642 lock (int): The lock ID to be released. 643 auto_save (bool): Whether to automatically save the database after releasing the lock. 644 645 Returns: 646 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 647 """ 648 if lock == self._vault['lock']: 649 self._vault['lock'] = None 650 self.clean_history(lock) 651 if auto_save: 652 return self.save(self.path()) 653 return True 654 return False 655 656 def account_exists(self, account) -> bool: 657 """ 658 Check if the given account exists in the vault. 659 660 Parameters: 661 account (str): The account number to check. 662 663 Returns: 664 bool: True if the account exists, False otherwise. 665 """ 666 return account in self._vault['account'] 667 668 def box_size(self, account) -> int: 669 """ 670 Calculate the size of the box for a specific account. 671 672 Parameters: 673 account (str): The account number for which the box size needs to be calculated. 674 675 Returns: 676 int: The size of the box for the given account. If the account does not exist, -1 is returned. 677 """ 678 if self.account_exists(account): 679 return len(self._vault['account'][account]['box']) 680 return -1 681 682 def log_size(self, account) -> int: 683 """ 684 Get the size of the log for a specific account. 685 686 Parameters: 687 account (str): The account number for which the log size needs to be calculated. 688 689 Returns: 690 int: The size of the log for the given account. If the account does not exist, -1 is returned. 691 """ 692 if self.account_exists(account): 693 return len(self._vault['account'][account]['log']) 694 return -1 695 696 @staticmethod 697 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 698 """ 699 Calculates the hash of a file using the specified algorithm. 700 701 Parameters: 702 file_path (str): The path to the file. 703 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 704 705 Returns: 706 str: The hexadecimal representation of the file's hash. 707 """ 708 hash_obj = hashlib.new(algorithm) # Create the hash object 709 with open(file_path, "rb") as f: # Open file in binary mode for reading 710 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 711 hash_obj.update(chunk) 712 return hash_obj.hexdigest() # Return the hash as a hexadecimal string 713 714 def snapshot_cache_path(self): 715 """ 716 Generate the path for the cache file used to store snapshots. 717 718 The cache file is a camel file that stores the timestamps of the snapshots. 719 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 720 721 Returns: 722 str: The path to the cache file. 723 """ 724 path = str(self.path()) 725 ext = self.ext() 726 ext_len = len(ext) 727 if path.endswith(f'.{ext}'): 728 path = path[:-ext_len-1] 729 return path + f'.snapshots.{ext}' 730 731 def snapshot(self) -> bool: 732 """ 733 This function creates a snapshot of the current database state. 734 735 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 736 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 737 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 738 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 739 740 Parameters: 741 None 742 743 Returns: 744 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 745 """ 746 current_hash = self.file_hash(self.path()) 747 cache: dict[str, int] = {} # hash: time_ns 748 try: 749 with open(self.snapshot_cache_path(), 'r') as stream: 750 cache = camel.load(stream.read()) 751 except: 752 pass 753 if current_hash in cache: 754 return True 755 time = time_ns() 756 cache[current_hash] = time 757 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 758 return False 759 with open(self.snapshot_cache_path(), 'w') as stream: 760 stream.write(camel.dump(cache)) 761 return True 762 763 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 764 -> dict[int, tuple[str, str, bool]]: 765 """ 766 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 767 768 Parameters: 769 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 770 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 771 772 Returns: 773 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 774 and the values are tuples containing the snapshot's hash, path, and existence status. 775 """ 776 cache: dict[str, int] = {} # hash: time_ns 777 try: 778 with open(self.snapshot_cache_path(), 'r') as stream: 779 cache = camel.load(stream.read()) 780 except: 781 pass 782 if not cache: 783 return {} 784 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 785 for file_hash, ref in cache.items(): 786 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 787 exists = os.path.exists(path) 788 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 789 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 790 continue 791 if exists or not hide_missing: 792 result[ref] = (file_hash, path, exists) 793 return result 794 795 def recall(self, dry=True, debug=False) -> bool: 796 """ 797 Revert the last operation. 798 799 Parameters: 800 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 801 debug (bool): If True, the function will print debug information. Default is False. 802 803 Returns: 804 bool: True if the operation was successful, False otherwise. 805 """ 806 if not self.nolock() or len(self._vault['history']) == 0: 807 return False 808 if len(self._vault['history']) <= 0: 809 return False 810 ref = sorted(self._vault['history'].keys())[-1] 811 if debug: 812 print('recall', ref) 813 memory = self._vault['history'][ref] 814 if debug: 815 print(type(memory), 'memory', memory) 816 limit = len(memory) + 1 817 sub_positive_log_negative = 0 818 for i in range(-1, -limit, -1): 819 x = memory[i] 820 if debug: 821 print(type(x), x) 822 match x['action']: 823 case Action.CREATE: 824 if x['account'] is not None: 825 if self.account_exists(x['account']): 826 if debug: 827 print('account', self._vault['account'][x['account']]) 828 assert len(self._vault['account'][x['account']]['box']) == 0 829 assert self._vault['account'][x['account']]['balance'] == 0 830 assert self._vault['account'][x['account']]['count'] == 0 831 if dry: 832 continue 833 del self._vault['account'][x['account']] 834 835 case Action.TRACK: 836 if x['account'] is not None: 837 if self.account_exists(x['account']): 838 if dry: 839 continue 840 self._vault['account'][x['account']]['balance'] -= x['value'] 841 self._vault['account'][x['account']]['count'] -= 1 842 del self._vault['account'][x['account']]['box'][x['ref']] 843 844 case Action.LOG: 845 if x['account'] is not None: 846 if self.account_exists(x['account']): 847 if x['ref'] in self._vault['account'][x['account']]['log']: 848 if dry: 849 continue 850 if sub_positive_log_negative == -x['value']: 851 self._vault['account'][x['account']]['count'] -= 1 852 sub_positive_log_negative = 0 853 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 854 if not box_ref is None: 855 assert self.box_exists(x['account'], box_ref) 856 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 857 assert box_value < 0 858 859 try: 860 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 861 except TypeError: 862 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 863 -box_value) 864 865 try: 866 self._vault['account'][x['account']]['balance'] += -box_value 867 except TypeError: 868 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 869 870 self._vault['account'][x['account']]['count'] -= 1 871 del self._vault['account'][x['account']]['log'][x['ref']] 872 873 case Action.SUB: 874 if x['account'] is not None: 875 if self.account_exists(x['account']): 876 if x['ref'] in self._vault['account'][x['account']]['box']: 877 if dry: 878 continue 879 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 880 self._vault['account'][x['account']]['balance'] += x['value'] 881 sub_positive_log_negative = x['value'] 882 883 case Action.ADD_FILE: 884 if x['account'] is not None: 885 if self.account_exists(x['account']): 886 if x['ref'] in self._vault['account'][x['account']]['log']: 887 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 888 if dry: 889 continue 890 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 891 892 case Action.REMOVE_FILE: 893 if x['account'] is not None: 894 if self.account_exists(x['account']): 895 if x['ref'] in self._vault['account'][x['account']]['log']: 896 if dry: 897 continue 898 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 899 900 case Action.BOX_TRANSFER: 901 if x['account'] is not None: 902 if self.account_exists(x['account']): 903 if x['ref'] in self._vault['account'][x['account']]['box']: 904 if dry: 905 continue 906 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 907 908 case Action.EXCHANGE: 909 if x['account'] is not None: 910 if x['account'] in self._vault['exchange']: 911 if x['ref'] in self._vault['exchange'][x['account']]: 912 if dry: 913 continue 914 del self._vault['exchange'][x['account']][x['ref']] 915 916 case Action.REPORT: 917 if x['ref'] in self._vault['report']: 918 if dry: 919 continue 920 del self._vault['report'][x['ref']] 921 922 case Action.ZAKAT: 923 if x['account'] is not None: 924 if self.account_exists(x['account']): 925 if x['ref'] in self._vault['account'][x['account']]['box']: 926 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 927 if dry: 928 continue 929 match x['math']: 930 case MathOperation.ADDITION: 931 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 932 'value'] 933 case MathOperation.EQUAL: 934 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 935 case MathOperation.SUBTRACTION: 936 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 937 'value'] 938 939 if not dry: 940 del self._vault['history'][ref] 941 return True 942 943 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 944 """ 945 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 946 947 Parameters: 948 account (str): The account number for which to check the existence of the reference. 949 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 950 ref (int): The reference (transaction) number to check for existence. 951 952 Returns: 953 bool: True if the reference exists for the given account and reference type, False otherwise. 954 """ 955 if account in self._vault['account']: 956 return ref in self._vault['account'][account][ref_type] 957 return False 958 959 def box_exists(self, account: str, ref: int) -> bool: 960 """ 961 Check if a specific box (transaction) exists in the vault for a given account and reference. 962 963 Parameters: 964 - account (str): The account number for which to check the existence of the box. 965 - ref (int): The reference (transaction) number to check for existence. 966 967 Returns: 968 - bool: True if the box exists for the given account and reference, False otherwise. 969 """ 970 return self.ref_exists(account, 'box', ref) 971 972 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 973 created: int = None, 974 debug: bool = False) -> int: 975 """ 976 This function tracks a transaction for a specific account. 977 978 Parameters: 979 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 980 desc (str): The description of the transaction. Default is an empty string. 981 account (str): The account for which the transaction is being tracked. Default is '1'. 982 logging (bool): Whether to log the transaction. Default is True. 983 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 984 debug (bool): Whether to print debug information. Default is False. 985 986 Returns: 987 int: The timestamp of the transaction. 988 989 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 990 991 Raises: 992 ValueError: The log transaction happened again in the same nanosecond time. 993 ValueError: The box transaction happened again in the same nanosecond time. 994 """ 995 if debug: 996 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 997 if created is None: 998 created = self.time() 999 no_lock = self.nolock() 1000 self.lock() 1001 if not self.account_exists(account): 1002 if debug: 1003 print(f"account {account} created") 1004 self._vault['account'][account] = { 1005 'balance': 0, 1006 'box': {}, 1007 'count': 0, 1008 'log': {}, 1009 'hide': False, 1010 'zakatable': True, 1011 } 1012 self._step(Action.CREATE, account) 1013 if unscaled_value == 0: 1014 if no_lock: 1015 self.free(self.lock()) 1016 return 0 1017 value = self.scale(unscaled_value) 1018 if logging: 1019 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1020 if debug: 1021 print('create-box', created) 1022 if self.box_exists(account, created): 1023 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1024 if debug: 1025 print('created-box', created) 1026 self._vault['account'][account]['box'][created] = { 1027 'capital': value, 1028 'count': 0, 1029 'last': 0, 1030 'rest': value, 1031 'total': 0, 1032 } 1033 self._step(Action.TRACK, account, ref=created, value=value) 1034 if no_lock: 1035 self.free(self.lock()) 1036 return created 1037 1038 def log_exists(self, account: str, ref: int) -> bool: 1039 """ 1040 Checks if a specific transaction log entry exists for a given account. 1041 1042 Parameters: 1043 account (str): The account number associated with the transaction log. 1044 ref (int): The reference to the transaction log entry. 1045 1046 Returns: 1047 bool: True if the transaction log entry exists, False otherwise. 1048 """ 1049 return self.ref_exists(account, 'log', ref) 1050 1051 def _log(self, value: float, desc: str = '', account: str = 1, created: int = None, ref: int = None, 1052 debug: bool = False) -> int: 1053 """ 1054 Log a transaction into the account's log. 1055 1056 Parameters: 1057 value (float): The value of the transaction. 1058 desc (str): The description of the transaction. 1059 account (str): The account to log the transaction into. Default is '1'. 1060 created (int): The timestamp of the transaction. If not provided, it will be generated. 1061 1062 Returns: 1063 int: The timestamp of the logged transaction. 1064 1065 This method updates the account's balance, count, and log with the transaction details. 1066 It also creates a step in the history of the transaction. 1067 1068 Raises: 1069 ValueError: The log transaction happened again in the same nanosecond time. 1070 """ 1071 if debug: 1072 print('_log', f'debug={debug}') 1073 if created is None: 1074 created = self.time() 1075 try: 1076 self._vault['account'][account]['balance'] += value 1077 except TypeError: 1078 self._vault['account'][account]['balance'] += Decimal(value) 1079 self._vault['account'][account]['count'] += 1 1080 if debug: 1081 print('create-log', created) 1082 if self.log_exists(account, created): 1083 raise ValueError(f"The log transaction happened again in the same nanosecond time({created}).") 1084 if debug: 1085 print('created-log', created) 1086 self._vault['account'][account]['log'][created] = { 1087 'value': value, 1088 'desc': desc, 1089 'ref': ref, 1090 'file': {}, 1091 } 1092 self._step(Action.LOG, account, ref=created, value=value) 1093 return created 1094 1095 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1096 debug: bool = False) -> dict: 1097 """ 1098 This method is used to record or retrieve exchange rates for a specific account. 1099 1100 Parameters: 1101 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1102 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1103 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1104 - description (str): A description of the exchange rate. 1105 1106 Returns: 1107 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1108 it returns a dictionary with default values for the rate and description. 1109 """ 1110 if debug: 1111 print('exchange', f'debug={debug}') 1112 if created is None: 1113 created = self.time() 1114 no_lock = self.nolock() 1115 self.lock() 1116 if rate is not None: 1117 if rate <= 0: 1118 return dict() 1119 if account not in self._vault['exchange']: 1120 self._vault['exchange'][account] = {} 1121 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1122 return {"time": created, "rate": 1, "description": None} 1123 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1124 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1125 if no_lock: 1126 self.free(self.lock()) 1127 if debug: 1128 print("exchange-created-1", 1129 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1130 1131 if account in self._vault['exchange']: 1132 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1133 if valid_rates: 1134 latest_rate = max(valid_rates, key=lambda x: x[0]) 1135 if debug: 1136 print("exchange-read-1", 1137 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1138 'latest_rate', latest_rate) 1139 result = latest_rate[1] 1140 result['time'] = latest_rate[0] 1141 return result # إرجاع قاموس يحتوي على المعدل والوصف 1142 if debug: 1143 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1144 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ 1145 1146 @staticmethod 1147 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1148 """ 1149 This function calculates the exchanged amount of a currency. 1150 1151 Args: 1152 x (float): The original amount of the currency. 1153 x_rate (float): The exchange rate of the original currency. 1154 y_rate (float): The exchange rate of the target currency. 1155 1156 Returns: 1157 float: The exchanged amount of the target currency. 1158 """ 1159 return (x * x_rate) / y_rate 1160 1161 def exchanges(self) -> dict: 1162 """ 1163 Retrieve the recorded exchange rates for all accounts. 1164 1165 Parameters: 1166 None 1167 1168 Returns: 1169 dict: A dictionary containing all recorded exchange rates. 1170 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1171 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1172 """ 1173 return self._vault['exchange'].copy() 1174 1175 def accounts(self) -> dict: 1176 """ 1177 Returns a dictionary containing account numbers as keys and their respective balances as values. 1178 1179 Parameters: 1180 None 1181 1182 Returns: 1183 dict: A dictionary where keys are account numbers and values are their respective balances. 1184 """ 1185 result = {} 1186 for i in self._vault['account']: 1187 result[i] = self._vault['account'][i]['balance'] 1188 return result 1189 1190 def boxes(self, account) -> dict: 1191 """ 1192 Retrieve the boxes (transactions) associated with a specific account. 1193 1194 Parameters: 1195 account (str): The account number for which to retrieve the boxes. 1196 1197 Returns: 1198 dict: A dictionary containing the boxes associated with the given account. 1199 If the account does not exist, an empty dictionary is returned. 1200 """ 1201 if self.account_exists(account): 1202 return self._vault['account'][account]['box'] 1203 return {} 1204 1205 def logs(self, account) -> dict: 1206 """ 1207 Retrieve the logs (transactions) associated with a specific account. 1208 1209 Parameters: 1210 account (str): The account number for which to retrieve the logs. 1211 1212 Returns: 1213 dict: A dictionary containing the logs associated with the given account. 1214 If the account does not exist, an empty dictionary is returned. 1215 """ 1216 if self.account_exists(account): 1217 return self._vault['account'][account]['log'] 1218 return {} 1219 1220 def add_file(self, account: str, ref: int, path: str) -> int: 1221 """ 1222 Adds a file reference to a specific transaction log entry in the vault. 1223 1224 Parameters: 1225 account (str): The account number associated with the transaction log. 1226 ref (int): The reference to the transaction log entry. 1227 path (str): The path of the file to be added. 1228 1229 Returns: 1230 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1231 """ 1232 if self.account_exists(account): 1233 if ref in self._vault['account'][account]['log']: 1234 file_ref = self.time() 1235 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1236 no_lock = self.nolock() 1237 self.lock() 1238 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1239 if no_lock: 1240 self.free(self.lock()) 1241 return file_ref 1242 return 0 1243 1244 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1245 """ 1246 Removes a file reference from a specific transaction log entry in the vault. 1247 1248 Parameters: 1249 account (str): The account number associated with the transaction log. 1250 ref (int): The reference to the transaction log entry. 1251 file_ref (int): The reference of the file to be removed. 1252 1253 Returns: 1254 bool: True if the file reference is successfully removed, False otherwise. 1255 """ 1256 if self.account_exists(account): 1257 if ref in self._vault['account'][account]['log']: 1258 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1259 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1261 no_lock = self.nolock() 1262 self.lock() 1263 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1264 if no_lock: 1265 self.free(self.lock()) 1266 return True 1267 return False 1268 1269 def balance(self, account: str = 1, cached: bool = True) -> int: 1270 """ 1271 Calculate and return the balance of a specific account. 1272 1273 Parameters: 1274 account (str): The account number. Default is '1'. 1275 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1276 1277 Returns: 1278 int: The balance of the account. 1279 1280 Note: 1281 If cached is True, the function returns the cached balance. 1282 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1283 """ 1284 if cached: 1285 return self._vault['account'][account]['balance'] 1286 x = 0 1287 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1] 1288 1289 def hide(self, account, status: bool = None) -> bool: 1290 """ 1291 Check or set the hide status of a specific account. 1292 1293 Parameters: 1294 account (str): The account number. 1295 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1296 1297 Returns: 1298 bool: The current or updated hide status of the account. 1299 1300 Raises: 1301 None 1302 1303 Example: 1304 >>> tracker = ZakatTracker() 1305 >>> ref = tracker.track(51, 'desc', 'account1') 1306 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1307 False 1308 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1309 True 1310 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1311 True 1312 >>> tracker.hide('account1', False) 1313 False 1314 """ 1315 if self.account_exists(account): 1316 if status is None: 1317 return self._vault['account'][account]['hide'] 1318 self._vault['account'][account]['hide'] = status 1319 return status 1320 return False 1321 1322 def zakatable(self, account, status: bool = None) -> bool: 1323 """ 1324 Check or set the zakatable status of a specific account. 1325 1326 Parameters: 1327 account (str): The account number. 1328 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1329 1330 Returns: 1331 bool: The current or updated zakatable status of the account. 1332 1333 Raises: 1334 None 1335 1336 Example: 1337 >>> tracker = ZakatTracker() 1338 >>> ref = tracker.track(51, 'desc', 'account1') 1339 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1340 True 1341 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1342 True 1343 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1344 True 1345 >>> tracker.zakatable('account1', False) 1346 False 1347 """ 1348 if self.account_exists(account): 1349 if status is None: 1350 return self._vault['account'][account]['zakatable'] 1351 self._vault['account'][account]['zakatable'] = status 1352 return status 1353 return False 1354 1355 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1356 debug: bool = False) \ 1357 -> tuple[ 1358 int, 1359 list[ 1360 tuple[int, int], 1361 ], 1362 ] | tuple: 1363 """ 1364 Subtracts a specified value from an account's balance. 1365 1366 Parameters: 1367 unscaled_value (float | int | Decimal): The amount to be subtracted. 1368 desc (str): A description for the transaction. Defaults to an empty string. 1369 account (str): The account from which the value will be subtracted. Defaults to '1'. 1370 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1371 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1372 1373 Returns: 1374 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1375 1376 If the amount to subtract is greater than the account's balance, 1377 the remaining amount will be transferred to a new transaction with a negative value. 1378 1379 Raises: 1380 ValueError: The box transaction happened again in the same nanosecond time. 1381 ValueError: The log transaction happened again in the same nanosecond time. 1382 """ 1383 if debug: 1384 print('sub', f'debug={debug}') 1385 if unscaled_value < 0: 1386 return tuple() 1387 if unscaled_value == 0: 1388 ref = self.track(unscaled_value, '', account) 1389 return ref, ref 1390 if created is None: 1391 created = self.time() 1392 no_lock = self.nolock() 1393 self.lock() 1394 self.track(0, '', account) 1395 value = self.scale(unscaled_value) 1396 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1397 ids = sorted(self._vault['account'][account]['box'].keys()) 1398 limit = len(ids) + 1 1399 target = value 1400 if debug: 1401 print('ids', ids) 1402 ages = [] 1403 for i in range(-1, -limit, -1): 1404 if target == 0: 1405 break 1406 j = ids[i] 1407 if debug: 1408 print('i', i, 'j', j) 1409 rest = self._vault['account'][account]['box'][j]['rest'] 1410 if rest >= target: 1411 self._vault['account'][account]['box'][j]['rest'] -= target 1412 self._step(Action.SUB, account, ref=j, value=target) 1413 ages.append((j, target)) 1414 target = 0 1415 break 1416 elif target > rest > 0: 1417 chunk = rest 1418 target -= chunk 1419 self._step(Action.SUB, account, ref=j, value=chunk) 1420 ages.append((j, chunk)) 1421 self._vault['account'][account]['box'][j]['rest'] = 0 1422 if target > 0: 1423 self.track( 1424 unscaled_value=self.unscale(-target), 1425 desc=desc, 1426 account=account, 1427 logging=False, 1428 created=created, 1429 ) 1430 ages.append((created, target)) 1431 if no_lock: 1432 self.free(self.lock()) 1433 return created, ages 1434 1435 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1436 created: int = None, 1437 debug: bool = False) -> list[int]: 1438 """ 1439 Transfers a specified value from one account to another. 1440 1441 Parameters: 1442 unscaled_amount (float | int | Decimal): The amount to be transferred. 1443 from_account (str): The account from which the value will be transferred. 1444 to_account (str): The account to which the value will be transferred. 1445 desc (str, optional): A description for the transaction. Defaults to an empty string. 1446 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1447 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1448 1449 Returns: 1450 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1451 1452 Raises: 1453 ValueError: Transfer to the same account is forbidden. 1454 ValueError: The box transaction happened again in the same nanosecond time. 1455 ValueError: The log transaction happened again in the same nanosecond time. 1456 """ 1457 if debug: 1458 print('transfer', f'debug={debug}') 1459 if from_account == to_account: 1460 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1461 if unscaled_amount <= 0: 1462 return [] 1463 if created is None: 1464 created = self.time() 1465 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1466 times = [] 1467 source_exchange = self.exchange(from_account, created) 1468 target_exchange = self.exchange(to_account, created) 1469 1470 if debug: 1471 print('ages', ages) 1472 1473 for age, value in ages: 1474 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1475 if debug: 1476 print('target_amount', target_amount) 1477 # Perform the transfer 1478 if self.box_exists(to_account, age): 1479 if debug: 1480 print('box_exists', age) 1481 capital = self._vault['account'][to_account]['box'][age]['capital'] 1482 rest = self._vault['account'][to_account]['box'][age]['rest'] 1483 if debug: 1484 print( 1485 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1486 selected_age = age 1487 if rest + target_amount > capital: 1488 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1489 selected_age = ZakatTracker.time() 1490 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1491 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1492 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1493 created=None, ref=None, debug=debug) 1494 times.append((age, y)) 1495 continue 1496 if debug: 1497 print( 1498 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1499 y = self.track( 1500 unscaled_value=self.unscale(int(target_amount)), 1501 desc=desc, 1502 account=to_account, 1503 logging=True, 1504 created=age, 1505 debug=debug, 1506 ) 1507 times.append(y) 1508 return times 1509 1510 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1511 cycle: float = None) -> tuple: 1512 """ 1513 Check the eligibility for Zakat based on the given parameters. 1514 1515 Parameters: 1516 silver_gram_price (float): The price of a gram of silver. 1517 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1518 it will be calculated based on the silver_gram_price. 1519 debug (bool): Flag to enable debug mode. 1520 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1521 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1522 1523 Returns: 1524 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1525 and a dictionary containing the Zakat plan. 1526 """ 1527 if debug: 1528 print('check', f'debug={debug}') 1529 if now is None: 1530 now = self.time() 1531 if cycle is None: 1532 cycle = ZakatTracker.TimeCycle() 1533 if unscaled_nisab is None: 1534 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1535 nisab = self.scale(unscaled_nisab) 1536 plan = {} 1537 below_nisab = 0 1538 brief = [0, 0, 0] 1539 valid = False 1540 if debug: 1541 print('exchanges', self.exchanges()) 1542 for x in self._vault['account']: 1543 if not self.zakatable(x): 1544 continue 1545 _box = self._vault['account'][x]['box'] 1546 _log = self._vault['account'][x]['log'] 1547 limit = len(_box) + 1 1548 ids = sorted(self._vault['account'][x]['box'].keys()) 1549 for i in range(-1, -limit, -1): 1550 j = ids[i] 1551 rest = float(_box[j]['rest']) 1552 if rest <= 0: 1553 continue 1554 exchange = self.exchange(x, created=self.time()) 1555 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1556 brief[0] += rest 1557 index = limit + i - 1 1558 epoch = (now - j) / cycle 1559 if debug: 1560 print(f"Epoch: {epoch}", _box[j]) 1561 if _box[j]['last'] > 0: 1562 epoch = (now - _box[j]['last']) / cycle 1563 if debug: 1564 print(f"Epoch: {epoch}") 1565 epoch = floor(epoch) 1566 if debug: 1567 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1568 if epoch == 0: 1569 continue 1570 if debug: 1571 print("Epoch - PASSED") 1572 brief[1] += rest 1573 if rest >= nisab: 1574 total = 0 1575 for _ in range(epoch): 1576 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1577 if total > 0: 1578 if x not in plan: 1579 plan[x] = {} 1580 valid = True 1581 brief[2] += total 1582 plan[x][index] = { 1583 'total': total, 1584 'count': epoch, 1585 'box_time': j, 1586 'box_capital': _box[j]['capital'], 1587 'box_rest': _box[j]['rest'], 1588 'box_last': _box[j]['last'], 1589 'box_total': _box[j]['total'], 1590 'box_count': _box[j]['count'], 1591 'box_log': _log[j]['desc'], 1592 'exchange_rate': exchange['rate'], 1593 'exchange_time': exchange['time'], 1594 'exchange_desc': exchange['description'], 1595 } 1596 else: 1597 chunk = ZakatTracker.ZakatCut(float(rest)) 1598 if chunk > 0: 1599 if x not in plan: 1600 plan[x] = {} 1601 if j not in plan[x].keys(): 1602 plan[x][index] = {} 1603 below_nisab += rest 1604 brief[2] += chunk 1605 plan[x][index]['below_nisab'] = chunk 1606 plan[x][index]['total'] = chunk 1607 plan[x][index]['count'] = epoch 1608 plan[x][index]['box_time'] = j 1609 plan[x][index]['box_capital'] = _box[j]['capital'] 1610 plan[x][index]['box_rest'] = _box[j]['rest'] 1611 plan[x][index]['box_last'] = _box[j]['last'] 1612 plan[x][index]['box_total'] = _box[j]['total'] 1613 plan[x][index]['box_count'] = _box[j]['count'] 1614 plan[x][index]['box_log'] = _log[j]['desc'] 1615 plan[x][index]['exchange_rate'] = exchange['rate'] 1616 plan[x][index]['exchange_time'] = exchange['time'] 1617 plan[x][index]['exchange_desc'] = exchange['description'] 1618 valid = valid or below_nisab >= nisab 1619 if debug: 1620 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1621 return valid, brief, plan 1622 1623 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1624 """ 1625 Build payment parts for the Zakat distribution. 1626 1627 Parameters: 1628 demand (float): The total demand for payment in local currency. 1629 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1630 1631 Returns: 1632 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1633 { 1634 'account': { 1635 'account_id': {'balance': float, 'rate': float, 'part': float}, 1636 ... 1637 }, 1638 'exceed': bool, 1639 'demand': float, 1640 'total': float, 1641 } 1642 """ 1643 total = 0 1644 parts = { 1645 'account': {}, 1646 'exceed': False, 1647 'demand': demand, 1648 } 1649 for x, y in self.accounts().items(): 1650 if positive_only and y <= 0: 1651 continue 1652 total += float(y) 1653 exchange = self.exchange(x) 1654 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1655 parts['total'] = total 1656 return parts 1657 1658 @staticmethod 1659 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1660 """ 1661 Checks the validity of payment parts. 1662 1663 Parameters: 1664 parts (dict): A dictionary containing payment parts information. 1665 debug (bool): Flag to enable debug mode. 1666 1667 Returns: 1668 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1669 1670 Error Codes: 1671 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1672 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1673 3: 'part' value in parts['account'][x] is less than 0. 1674 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1675 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1676 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1677 """ 1678 if debug: 1679 print('check_payment_parts', f'debug={debug}') 1680 for i in ['demand', 'account', 'total', 'exceed']: 1681 if i not in parts: 1682 return 1 1683 exceed = parts['exceed'] 1684 for x in parts['account']: 1685 for j in ['balance', 'rate', 'part']: 1686 if j not in parts['account'][x]: 1687 return 2 1688 if parts['account'][x]['part'] < 0: 1689 return 3 1690 if not exceed and parts['account'][x]['balance'] <= 0: 1691 return 4 1692 demand = parts['demand'] 1693 z = 0 1694 for _, y in parts['account'].items(): 1695 if not exceed and y['part'] > y['balance']: 1696 return 5 1697 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1698 z = round(z, 2) 1699 demand = round(demand, 2) 1700 if debug: 1701 print('check_payment_parts', f'z = {z}, demand = {demand}') 1702 print('check_payment_parts', type(z), type(demand)) 1703 print('check_payment_parts', z != demand) 1704 print('check_payment_parts', str(z) != str(demand)) 1705 if z != demand and str(z) != str(demand): 1706 return 6 1707 return 0 1708 1709 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1710 """ 1711 Perform Zakat calculation based on the given report and optional parts. 1712 1713 Parameters: 1714 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1715 parts (dict): A dictionary containing the payment parts for the zakat. 1716 debug (bool): A flag indicating whether to print debug information. 1717 1718 Returns: 1719 bool: True if the zakat calculation is successful, False otherwise. 1720 """ 1721 if debug: 1722 print('zakat', f'debug={debug}') 1723 valid, _, plan = report 1724 if not valid: 1725 return valid 1726 parts_exist = parts is not None 1727 if parts_exist: 1728 if self.check_payment_parts(parts, debug=debug) != 0: 1729 return False 1730 if debug: 1731 print('######### zakat #######') 1732 print('parts_exist', parts_exist) 1733 no_lock = self.nolock() 1734 self.lock() 1735 report_time = self.time() 1736 self._vault['report'][report_time] = report 1737 self._step(Action.REPORT, ref=report_time) 1738 created = self.time() 1739 for x in plan: 1740 target_exchange = self.exchange(x) 1741 if debug: 1742 print(plan[x]) 1743 print('-------------') 1744 print(self._vault['account'][x]['box']) 1745 ids = sorted(self._vault['account'][x]['box'].keys()) 1746 if debug: 1747 print('plan[x]', plan[x]) 1748 for i in plan[x].keys(): 1749 j = ids[i] 1750 if debug: 1751 print('i', i, 'j', j) 1752 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1753 key='last', 1754 math_operation=MathOperation.EQUAL) 1755 self._vault['account'][x]['box'][j]['last'] = created 1756 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1757 self._vault['account'][x]['box'][j]['total'] += amount 1758 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1759 math_operation=MathOperation.ADDITION) 1760 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1761 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1762 math_operation=MathOperation.ADDITION) 1763 if not parts_exist: 1764 try: 1765 self._vault['account'][x]['box'][j]['rest'] -= amount 1766 except TypeError: 1767 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1768 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1769 # math_operation=MathOperation.SUBTRACTION) 1770 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1771 if parts_exist: 1772 for account, part in parts['account'].items(): 1773 if part['part'] == 0: 1774 continue 1775 if debug: 1776 print('zakat-part', account, part['rate']) 1777 target_exchange = self.exchange(account) 1778 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1779 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1780 if no_lock: 1781 self.free(self.lock()) 1782 return True 1783 1784 def export_json(self, path: str = "data.json") -> bool: 1785 """ 1786 Exports the current state of the ZakatTracker object to a JSON file. 1787 1788 Parameters: 1789 path (str): The path where the JSON file will be saved. Default is "data.json". 1790 1791 Returns: 1792 bool: True if the export is successful, False otherwise. 1793 1794 Raises: 1795 No specific exceptions are raised by this method. 1796 """ 1797 with open(path, "w") as file: 1798 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1799 return True 1800 1801 def save(self, path: str = None) -> bool: 1802 """ 1803 Saves the ZakatTracker's current state to a camel file. 1804 1805 This method serializes the internal data (`_vault`). 1806 1807 Parameters: 1808 path (str, optional): File path for saving. Defaults to a predefined location. 1809 1810 Returns: 1811 bool: True if the save operation is successful, False otherwise. 1812 """ 1813 if path is None: 1814 path = self.path() 1815 with open(f'{path}.tmp', 'w') as stream: 1816 # first save in tmp file 1817 stream.write(camel.dump(self._vault)) 1818 # then move tmp file to original location 1819 shutil.move(f'{path}.tmp', path) 1820 return True 1821 1822 def load(self, path: str = None) -> bool: 1823 """ 1824 Load the current state of the ZakatTracker object from a camel file. 1825 1826 Parameters: 1827 path (str): The path where the camel file is located. If not provided, it will use the default path. 1828 1829 Returns: 1830 bool: True if the load operation is successful, False otherwise. 1831 """ 1832 if path is None: 1833 path = self.path() 1834 if os.path.exists(path): 1835 with open(path, 'r') as stream: 1836 self._vault = camel.load(stream.read()) 1837 return True 1838 return False 1839 1840 def import_csv_cache_path(self): 1841 """ 1842 Generates the cache file path for imported CSV data. 1843 1844 This function constructs the file path where cached data from CSV imports 1845 will be stored. The cache file is a camel file (.camel extension) appended 1846 to the base path of the object. 1847 1848 Returns: 1849 str: The full path to the import CSV cache file. 1850 1851 Example: 1852 >>> obj = ZakatTracker('/data/reports') 1853 >>> obj.import_csv_cache_path() 1854 '/data/reports.import_csv.camel' 1855 """ 1856 path = str(self.path()) 1857 ext = self.ext() 1858 ext_len = len(ext) 1859 if path.endswith(f'.{ext}'): 1860 path = path[:-ext_len-1] 1861 return path + f'.import_csv.{ext}' 1862 1863 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1864 """ 1865 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1866 1867 Parameters: 1868 path (str): The path to the CSV file. Default is 'file.csv'. 1869 debug (bool): A flag indicating whether to print debug information. 1870 1871 Returns: 1872 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1873 and a dictionary of bad transactions. 1874 1875 Notes: 1876 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1877 are appropriate for the currency pairs involved in the conversions. 1878 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1879 to 1.0 or the previous rate for that account. 1880 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1881 transactions of the same account within the whole imported and existing dataset when doing `check` and 1882 `zakat` operations. 1883 1884 Example Usage: 1885 The CSV file should have the following format, rate is optional per transaction: 1886 account, desc, value, date, rate 1887 For example: 1888 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1889 """ 1890 if debug: 1891 print('import_csv', f'debug={debug}') 1892 cache: list[int] = [] 1893 try: 1894 with open(self.import_csv_cache_path(), 'r') as stream: 1895 cache = camel.load(stream.read()) 1896 except: 1897 pass 1898 date_formats = [ 1899 "%Y-%m-%d %H:%M:%S", 1900 "%Y-%m-%dT%H:%M:%S", 1901 "%Y-%m-%dT%H%M%S", 1902 "%Y-%m-%d", 1903 ] 1904 created, found, bad = 0, 0, {} 1905 data: dict[int, list] = {} 1906 with open(path, newline='', encoding="utf-8") as f: 1907 i = 0 1908 for row in csv.reader(f, delimiter=','): 1909 i += 1 1910 hashed = hash(tuple(row)) 1911 if hashed in cache: 1912 found += 1 1913 continue 1914 account = row[0] 1915 desc = row[1] 1916 value = float(row[2]) 1917 rate = 1.0 1918 if row[4:5]: # Empty list if index is out of range 1919 rate = float(row[4]) 1920 date: int = 0 1921 for time_format in date_formats: 1922 try: 1923 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1924 break 1925 except: 1926 pass 1927 # TODO: not allowed for negative dates in the future after enhance time functions 1928 if date == 0 or value == 0: 1929 bad[i] = row + ('invalid date',) 1930 continue 1931 if date not in data: 1932 data[date] = [] 1933 data[date].append((i, account, desc, value, date, rate, hashed)) 1934 1935 if debug: 1936 print('import_csv', len(data)) 1937 1938 if bad: 1939 return created, found, bad 1940 1941 for date, rows in sorted(data.items()): 1942 try: 1943 len_rows = len(rows) 1944 if len_rows == 1: 1945 (_, account, desc, value, date, rate, hashed) = rows[0] 1946 if rate > 0: 1947 self.exchange(account, created=date, rate=rate) 1948 if value > 0: 1949 self.track(value, desc, account, True, date) 1950 elif value < 0: 1951 self.sub(-value, desc, account, date) 1952 created += 1 1953 cache.append(hashed) 1954 continue 1955 if debug: 1956 print('-- Duplicated time detected', date, 'len', len_rows) 1957 print(rows) 1958 print('---------------------------------') 1959 # If records are found at the same time with different accounts in the same amount 1960 # (one positive and the other negative), this indicates it is a transfer. 1961 if len_rows != 2: 1962 raise Exception(f'more than two transactions({len_rows}) at the same time') 1963 (i, account1, desc1, value1, date1, rate1, _) = rows[0] 1964 (j, account2, desc2, value2, date2, rate2, _) = rows[1] 1965 if account1 == account2 or desc1 != desc2 or abs(value1) != abs(value2) or date1 != date2: 1966 raise Exception('invalid transfer') 1967 if rate1 > 0: 1968 self.exchange(account1, created=date1, rate=rate1) 1969 if rate2 > 0: 1970 self.exchange(account2, created=date2, rate=rate2) 1971 values = { 1972 value1: account1, 1973 value2: account2, 1974 } 1975 self.transfer( 1976 unscaled_amount=abs(value1), 1977 from_account=values[min(values.keys())], 1978 to_account=values[max(values.keys())], 1979 desc=desc1, 1980 created=date1, 1981 ) 1982 except Exception as e: 1983 for (i, account, desc, value, date, rate, _) in rows: 1984 bad[i] = (account, desc, value, date, rate, e) 1985 break 1986 with open(self.import_csv_cache_path(), 'w') as stream: 1987 stream.write(camel.dump(cache)) 1988 return created, found, bad 1989 1990 ######## 1991 # TESTS # 1992 ####### 1993 1994 @staticmethod 1995 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1996 """ 1997 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 1998 1999 This function iterates through progressively larger units of information 2000 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2001 range that can be expressed with a reasonable number before the unit. 2002 2003 Parameters: 2004 size (float): The size in bytes to convert. 2005 decimal_places (int, optional): The number of decimal places to display 2006 in the result. Defaults to 2. 2007 2008 Returns: 2009 str: A string representation of the size in a human-readable format, 2010 rounded to the specified number of decimal places. For example: 2011 - "1.50 KB" (1536 bytes) 2012 - "23.00 MB" (24117248 bytes) 2013 - "1.23 GB" (1325899906 bytes) 2014 """ 2015 if type(size) not in (float, int): 2016 raise TypeError("size must be a float or integer") 2017 if type(decimal_places) != int: 2018 raise TypeError("decimal_places must be an integer") 2019 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2020 if size < 1024.0: 2021 break 2022 size /= 1024.0 2023 return f"{size:.{decimal_places}f} {unit}" 2024 2025 @staticmethod 2026 def get_dict_size(obj: dict, seen: set = None) -> float: 2027 """ 2028 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2029 2030 This function traverses the dictionary structure, accounting for the size of keys, values, 2031 and any nested objects. It handles various data types commonly found in dictionaries 2032 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2033 of circular references. 2034 2035 Parameters: 2036 obj (dict): The dictionary whose size is to be calculated. 2037 seen (set, optional): A set used internally to track visited objects 2038 and avoid circular references. Defaults to None. 2039 2040 Returns: 2041 float: An approximate size of the dictionary and its contents in bytes. 2042 2043 Note: 2044 - This function is a method of the `ZakatTracker` class and is likely used to 2045 estimate the memory footprint of data structures relevant to Zakat calculations. 2046 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2047 not account for all memory overhead depending on the Python implementation. 2048 - Circular references are handled to prevent infinite recursion. 2049 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2050 - String sizes are estimated based on character length and encoding. 2051 """ 2052 size = 0 2053 if seen is None: 2054 seen = set() 2055 2056 obj_id = id(obj) 2057 if obj_id in seen: 2058 return 0 2059 2060 seen.add(obj_id) 2061 size += sys.getsizeof(obj) 2062 2063 if isinstance(obj, dict): 2064 for k, v in obj.items(): 2065 size += ZakatTracker.get_dict_size(k, seen) 2066 size += ZakatTracker.get_dict_size(v, seen) 2067 elif isinstance(obj, (list, tuple, set, frozenset)): 2068 for item in obj: 2069 size += ZakatTracker.get_dict_size(item, seen) 2070 elif isinstance(obj, (int, float, complex)): # Handle numbers 2071 pass # Basic numbers have a fixed size, so nothing to add here 2072 elif isinstance(obj, str): # Handle strings 2073 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2074 return size 2075 2076 @staticmethod 2077 def duration_from_nanoseconds(ns: int, 2078 show_zeros_in_spoken_time: bool = False, 2079 spoken_time_separator=',', 2080 millennia: str = 'Millennia', 2081 century: str = 'Century', 2082 years: str = 'Years', 2083 days: str = 'Days', 2084 hours: str = 'Hours', 2085 minutes: str = 'Minutes', 2086 seconds: str = 'Seconds', 2087 milli_seconds: str = 'MilliSeconds', 2088 micro_seconds: str = 'MicroSeconds', 2089 nano_seconds: str = 'NanoSeconds', 2090 ) -> tuple: 2091 """ 2092 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2093 Convert NanoSeconds to Human Readable Time Format. 2094 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2095 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2096 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2097 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2098 2099 INPUT : ms (AKA: MilliSeconds) 2100 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2101 OUTPUT Variables: time_lapsed, spoken_time 2102 2103 Example Input: duration_from_nanoseconds(ns) 2104 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2105 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2106 duration_from_nanoseconds(1234567890123456789012) 2107 """ 2108 us, ns = divmod(ns, 1000) 2109 ms, us = divmod(us, 1000) 2110 s, ms = divmod(ms, 1000) 2111 m, s = divmod(s, 60) 2112 h, m = divmod(m, 60) 2113 d, h = divmod(h, 24) 2114 y, d = divmod(d, 365) 2115 c, y = divmod(y, 100) 2116 n, c = divmod(c, 10) 2117 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2118 spoken_time_part = [] 2119 if n > 0 or show_zeros_in_spoken_time: 2120 spoken_time_part.append(f"{n: 3d} {millennia}") 2121 if c > 0 or show_zeros_in_spoken_time: 2122 spoken_time_part.append(f"{c: 4d} {century}") 2123 if y > 0 or show_zeros_in_spoken_time: 2124 spoken_time_part.append(f"{y: 3d} {years}") 2125 if d > 0 or show_zeros_in_spoken_time: 2126 spoken_time_part.append(f"{d: 4d} {days}") 2127 if h > 0 or show_zeros_in_spoken_time: 2128 spoken_time_part.append(f"{h: 2d} {hours}") 2129 if m > 0 or show_zeros_in_spoken_time: 2130 spoken_time_part.append(f"{m: 2d} {minutes}") 2131 if s > 0 or show_zeros_in_spoken_time: 2132 spoken_time_part.append(f"{s: 2d} {seconds}") 2133 if ms > 0 or show_zeros_in_spoken_time: 2134 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2135 if us > 0 or show_zeros_in_spoken_time: 2136 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2137 if ns > 0 or show_zeros_in_spoken_time: 2138 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2139 return time_lapsed, spoken_time_separator.join(spoken_time_part) 2140 2141 @staticmethod 2142 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2143 """ 2144 Convert a specific day, month, and year into a timestamp. 2145 2146 Parameters: 2147 day (int): The day of the month. 2148 month (int): The month of the year. Default is 6 (June). 2149 year (int): The year. Default is 2024. 2150 2151 Returns: 2152 int: The timestamp representing the given day, month, and year. 2153 2154 Note: 2155 This method assumes the default month and year if not provided. 2156 """ 2157 return ZakatTracker.time(datetime.datetime(year, month, day)) 2158 2159 @staticmethod 2160 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2161 """ 2162 Generate a random date between two given dates. 2163 2164 Parameters: 2165 start_date (datetime.datetime): The start date from which to generate a random date. 2166 end_date (datetime.datetime): The end date until which to generate a random date. 2167 2168 Returns: 2169 datetime.datetime: A random date between the start_date and end_date. 2170 """ 2171 time_between_dates = end_date - start_date 2172 days_between_dates = time_between_dates.days 2173 random_number_of_days = random.randrange(days_between_dates) 2174 return start_date + datetime.timedelta(days=random_number_of_days) 2175 2176 @staticmethod 2177 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2178 debug: bool = False) -> int: 2179 """ 2180 Generate a random CSV file with specified parameters. 2181 2182 Parameters: 2183 path (str): The path where the CSV file will be saved. Default is "data.csv". 2184 count (int): The number of rows to generate in the CSV file. Default is 1000. 2185 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2186 debug (bool): A flag indicating whether to print debug information. 2187 2188 Returns: 2189 None. The function generates a CSV file at the specified path with the given count of rows. 2190 Each row contains a randomly generated account, description, value, and date. 2191 The value is randomly generated between 1000 and 100000, 2192 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2193 If the row number is not divisible by 13, the value is multiplied by -1. 2194 """ 2195 if debug: 2196 print('generate_random_csv_file', f'debug={debug}') 2197 i = 0 2198 with open(path, "w", newline="") as csvfile: 2199 writer = csv.writer(csvfile) 2200 for i in range(count): 2201 account = f"acc-{random.randint(1, 1000)}" 2202 desc = f"Some text {random.randint(1, 1000)}" 2203 value = random.randint(1000, 100000) 2204 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2205 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2206 if not i % 13 == 0: 2207 value *= -1 2208 row = [account, desc, value, date] 2209 if with_rate: 2210 rate = random.randint(1, 100) * 0.12 2211 if debug: 2212 print('before-append', row) 2213 row.append(rate) 2214 if debug: 2215 print('after-append', row) 2216 writer.writerow(row) 2217 i = i + 1 2218 return i 2219 2220 @staticmethod 2221 def create_random_list(max_sum, min_value=0, max_value=10): 2222 """ 2223 Creates a list of random integers whose sum does not exceed the specified maximum. 2224 2225 Args: 2226 max_sum: The maximum allowed sum of the list elements. 2227 min_value: The minimum possible value for an element (inclusive). 2228 max_value: The maximum possible value for an element (inclusive). 2229 2230 Returns: 2231 A list of random integers. 2232 """ 2233 result = [] 2234 current_sum = 0 2235 2236 while current_sum < max_sum: 2237 # Calculate the remaining space for the next element 2238 remaining_sum = max_sum - current_sum 2239 # Determine the maximum possible value for the next element 2240 next_max_value = min(remaining_sum, max_value) 2241 # Generate a random element within the allowed range 2242 next_element = random.randint(min_value, next_max_value) 2243 result.append(next_element) 2244 current_sum += next_element 2245 2246 return result 2247 2248 def _test_core(self, restore=False, debug=False): 2249 2250 if debug: 2251 random.seed(1234567890) 2252 2253 # sanity check - random forward time 2254 2255 xlist = [] 2256 limit = 1000 2257 for _ in range(limit): 2258 y = ZakatTracker.time() 2259 z = '-' 2260 if y not in xlist: 2261 xlist.append(y) 2262 else: 2263 z = 'x' 2264 if debug: 2265 print(z, y) 2266 xx = len(xlist) 2267 if debug: 2268 print('count', xx, ' - unique: ', (xx / limit) * 100, '%') 2269 assert limit == xx 2270 2271 # sanity check - convert date since 1000AD 2272 2273 for year in range(1000, 9000): 2274 ns = ZakatTracker.time(datetime.datetime.strptime(f"{year}-12-30 18:30:45", "%Y-%m-%d %H:%M:%S")) 2275 date = ZakatTracker.time_to_datetime(ns) 2276 if debug: 2277 print(date) 2278 assert date.year == year 2279 assert date.month == 12 2280 assert date.day == 30 2281 assert date.hour == 18 2282 assert date.minute == 30 2283 assert date.second in [44, 45] 2284 2285 # human_readable_size 2286 2287 assert ZakatTracker.human_readable_size(0) == "0.00 B" 2288 assert ZakatTracker.human_readable_size(512) == "512.00 B" 2289 assert ZakatTracker.human_readable_size(1023) == "1023.00 B" 2290 2291 assert ZakatTracker.human_readable_size(1024) == "1.00 KB" 2292 assert ZakatTracker.human_readable_size(2048) == "2.00 KB" 2293 assert ZakatTracker.human_readable_size(5120) == "5.00 KB" 2294 2295 assert ZakatTracker.human_readable_size(1024 ** 2) == "1.00 MB" 2296 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2) == "2.50 MB" 2297 2298 assert ZakatTracker.human_readable_size(1024 ** 3) == "1.00 GB" 2299 assert ZakatTracker.human_readable_size(1024 ** 4) == "1.00 TB" 2300 assert ZakatTracker.human_readable_size(1024 ** 5) == "1.00 PB" 2301 2302 assert ZakatTracker.human_readable_size(1536, decimal_places=0) == "2 KB" 2303 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2, decimal_places=1) == "2.5 MB" 2304 assert ZakatTracker.human_readable_size(1234567890, decimal_places=3) == "1.150 GB" 2305 2306 try: 2307 ZakatTracker.human_readable_size("not a number") 2308 assert False, "Expected TypeError for invalid input" 2309 except TypeError: 2310 pass 2311 2312 try: 2313 ZakatTracker.human_readable_size(1024, decimal_places="not an int") 2314 assert False, "Expected TypeError for invalid decimal_places" 2315 except TypeError: 2316 pass 2317 2318 # get_dict_size 2319 assert ZakatTracker.get_dict_size({}) == sys.getsizeof({}), "Empty dictionary size mismatch" 2320 assert ZakatTracker.get_dict_size({"a": 1, "b": 2.5, "c": True}) != sys.getsizeof({}), "Not Empty dictionary" 2321 2322 # number scale 2323 error = 0 2324 total = 0 2325 for sign in ['', '-']: 2326 for max_i, max_j, decimal_places in [ 2327 (101, 101, 2), # fiat currency minimum unit took 2 decimal places 2328 (1, 1_000, 8), # cryptocurrency like Satoshi in Bitcoin took 8 decimal places 2329 (1, 1_000, 18) # cryptocurrency like Wei in Ethereum took 18 decimal places 2330 ]: 2331 for return_type in ( 2332 float, 2333 Decimal, 2334 ): 2335 for i in range(max_i): 2336 for j in range(max_j): 2337 total += 1 2338 num_str = f'{sign}{i}.{j:0{decimal_places}d}' 2339 num = return_type(num_str) 2340 scaled = self.scale(num, decimal_places=decimal_places) 2341 unscaled = self.unscale(scaled, return_type=return_type, decimal_places=decimal_places) 2342 if debug: 2343 print( 2344 f'return_type: {return_type}, num_str: {num_str} - num: {num} - scaled: {scaled} - unscaled: {unscaled}') 2345 if unscaled != num: 2346 if debug: 2347 print('***** SCALE ERROR *****') 2348 error += 1 2349 if debug: 2350 print(f'total: {total}, error({error}): {100 * error / total}%') 2351 assert error == 0 2352 2353 assert self.nolock() 2354 assert self._history() is True 2355 2356 table = { 2357 1: [ 2358 (0, 10, 1000, 1000, 1000, 1, 1), 2359 (0, 20, 3000, 3000, 3000, 2, 2), 2360 (0, 30, 6000, 6000, 6000, 3, 3), 2361 (1, 15, 4500, 4500, 4500, 3, 4), 2362 (1, 50, -500, -500, -500, 4, 5), 2363 (1, 100, -10500, -10500, -10500, 5, 6), 2364 ], 2365 'wallet': [ 2366 (1, 90, -9000, -9000, -9000, 1, 1), 2367 (0, 100, 1000, 1000, 1000, 2, 2), 2368 (1, 190, -18000, -18000, -18000, 3, 3), 2369 (0, 1000, 82000, 82000, 82000, 4, 4), 2370 ], 2371 } 2372 for x in table: 2373 for y in table[x]: 2374 self.lock() 2375 if y[0] == 0: 2376 ref = self.track( 2377 unscaled_value=y[1], 2378 desc='test-add', 2379 account=x, 2380 logging=True, 2381 created=ZakatTracker.time(), 2382 debug=debug, 2383 ) 2384 else: 2385 (ref, z) = self.sub( 2386 unscaled_value=y[1], 2387 desc='test-sub', 2388 account=x, 2389 created=ZakatTracker.time(), 2390 ) 2391 if debug: 2392 print('_sub', z, ZakatTracker.time()) 2393 assert ref != 0 2394 assert len(self._vault['account'][x]['log'][ref]['file']) == 0 2395 for i in range(3): 2396 file_ref = self.add_file(x, ref, 'file_' + str(i)) 2397 sleep(0.0000001) 2398 assert file_ref != 0 2399 if debug: 2400 print('ref', ref, 'file', file_ref) 2401 assert len(self._vault['account'][x]['log'][ref]['file']) == i + 1 2402 file_ref = self.add_file(x, ref, 'file_' + str(3)) 2403 assert self.remove_file(x, ref, file_ref) 2404 z = self.balance(x) 2405 if debug: 2406 print("debug-0", z, y) 2407 assert z == y[2] 2408 z = self.balance(x, False) 2409 if debug: 2410 print("debug-1", z, y[3]) 2411 assert z == y[3] 2412 o = self._vault['account'][x]['log'] 2413 z = 0 2414 for i in o: 2415 z += o[i]['value'] 2416 if debug: 2417 print("debug-2", z, type(z)) 2418 print("debug-2", y[4], type(y[4])) 2419 assert z == y[4] 2420 if debug: 2421 print('debug-2 - PASSED') 2422 assert self.box_size(x) == y[5] 2423 assert self.log_size(x) == y[6] 2424 assert not self.nolock() 2425 self.free(self.lock()) 2426 assert self.nolock() 2427 assert self.boxes(x) != {} 2428 assert self.logs(x) != {} 2429 2430 assert not self.hide(x) 2431 assert self.hide(x, False) is False 2432 assert self.hide(x) is False 2433 assert self.hide(x, True) 2434 assert self.hide(x) 2435 2436 assert self.zakatable(x) 2437 assert self.zakatable(x, False) is False 2438 assert self.zakatable(x) is False 2439 assert self.zakatable(x, True) 2440 assert self.zakatable(x) 2441 2442 if restore is True: 2443 count = len(self._vault['history']) 2444 if debug: 2445 print('history-count', count) 2446 assert count == 10 2447 # try mode 2448 for _ in range(count): 2449 assert self.recall(True, debug) 2450 count = len(self._vault['history']) 2451 if debug: 2452 print('history-count', count) 2453 assert count == 10 2454 _accounts = list(table.keys()) 2455 accounts_limit = len(_accounts) + 1 2456 for i in range(-1, -accounts_limit, -1): 2457 account = _accounts[i] 2458 if debug: 2459 print(account, len(table[account])) 2460 transaction_limit = len(table[account]) + 1 2461 for j in range(-1, -transaction_limit, -1): 2462 row = table[account][j] 2463 if debug: 2464 print(row, self.balance(account), self.balance(account, False)) 2465 assert self.balance(account) == self.balance(account, False) 2466 assert self.balance(account) == row[2] 2467 assert self.recall(False, debug) 2468 assert self.recall(False, debug) is False 2469 count = len(self._vault['history']) 2470 if debug: 2471 print('history-count', count) 2472 assert count == 0 2473 self.reset() 2474 2475 def test(self, debug: bool = False) -> bool: 2476 if debug: 2477 print('test', f'debug={debug}') 2478 try: 2479 2480 self._test_core(True, debug) 2481 self._test_core(False, debug) 2482 2483 assert self._history() 2484 2485 # Not allowed for duplicate transactions in the same account and time 2486 2487 created = ZakatTracker.time() 2488 self.track(100, 'test-1', 'same', True, created) 2489 failed = False 2490 try: 2491 self.track(50, 'test-1', 'same', True, created) 2492 except: 2493 failed = True 2494 assert failed is True 2495 2496 self.reset() 2497 2498 # Same account transfer 2499 for x in [1, 'a', True, 1.8, None]: 2500 failed = False 2501 try: 2502 self.transfer(1, x, x, 'same-account', debug=debug) 2503 except: 2504 failed = True 2505 assert failed is True 2506 2507 # Always preserve box age during transfer 2508 2509 series: list[tuple] = [ 2510 (30, 4), 2511 (60, 3), 2512 (90, 2), 2513 ] 2514 case = { 2515 3000: { 2516 'series': series, 2517 'rest': 15000, 2518 }, 2519 6000: { 2520 'series': series, 2521 'rest': 12000, 2522 }, 2523 9000: { 2524 'series': series, 2525 'rest': 9000, 2526 }, 2527 18000: { 2528 'series': series, 2529 'rest': 0, 2530 }, 2531 27000: { 2532 'series': series, 2533 'rest': -9000, 2534 }, 2535 36000: { 2536 'series': series, 2537 'rest': -18000, 2538 }, 2539 } 2540 2541 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2542 2543 for total in case: 2544 if debug: 2545 print('--------------------------------------------------------') 2546 print(f'case[{total}]', case[total]) 2547 for x in case[total]['series']: 2548 self.track( 2549 unscaled_value=x[0], 2550 desc=f"test-{x} ages", 2551 account='ages', 2552 logging=True, 2553 created=selected_time * x[1], 2554 ) 2555 2556 unscaled_total = self.unscale(total) 2557 if debug: 2558 print('unscaled_total', unscaled_total) 2559 refs = self.transfer( 2560 unscaled_amount=unscaled_total, 2561 from_account='ages', 2562 to_account='future', 2563 desc='Zakat Movement', 2564 debug=debug, 2565 ) 2566 2567 if debug: 2568 print('refs', refs) 2569 2570 ages_cache_balance = self.balance('ages') 2571 ages_fresh_balance = self.balance('ages', False) 2572 rest = case[total]['rest'] 2573 if debug: 2574 print('source', ages_cache_balance, ages_fresh_balance, rest) 2575 assert ages_cache_balance == rest 2576 assert ages_fresh_balance == rest 2577 2578 future_cache_balance = self.balance('future') 2579 future_fresh_balance = self.balance('future', False) 2580 if debug: 2581 print('target', future_cache_balance, future_fresh_balance, total) 2582 print('refs', refs) 2583 assert future_cache_balance == total 2584 assert future_fresh_balance == total 2585 2586 # TODO: check boxes times for `ages` should equal box times in `future` 2587 for ref in self._vault['account']['ages']['box']: 2588 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2589 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2590 future_capital = 0 2591 if ref in self._vault['account']['future']['box']: 2592 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2593 future_rest = 0 2594 if ref in self._vault['account']['future']['box']: 2595 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2596 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2597 if debug: 2598 print('================================================================') 2599 print('ages', ages_capital, ages_rest) 2600 print('future', future_capital, future_rest) 2601 if ages_rest == 0: 2602 assert ages_capital == future_capital 2603 elif ages_rest < 0: 2604 assert -ages_capital == future_capital 2605 elif ages_rest > 0: 2606 assert ages_capital == ages_rest + future_capital 2607 self.reset() 2608 assert len(self._vault['history']) == 0 2609 2610 assert self._history() 2611 assert self._history(False) is False 2612 assert self._history() is False 2613 assert self._history(True) 2614 assert self._history() 2615 if debug: 2616 print('####################################################################') 2617 2618 transaction = [ 2619 ( 2620 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2621 2000, 2000, 2000, 1, 1, 2622 ), 2623 ( 2624 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2625 75000, 75000, 75000, 1, 1, 2626 ), 2627 ( 2628 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2629 60000, 60000, 60000, 1, 1, 2630 ), 2631 ] 2632 for z in transaction: 2633 self.lock() 2634 x = z[1] 2635 y = z[2] 2636 self.transfer( 2637 unscaled_amount=z[0], 2638 from_account=x, 2639 to_account=y, 2640 desc='test-transfer', 2641 debug=debug, 2642 ) 2643 zz = self.balance(x) 2644 if debug: 2645 print(zz, z) 2646 assert zz == z[3] 2647 xx = self.accounts()[x] 2648 assert xx == z[3] 2649 assert self.balance(x, False) == z[4] 2650 assert xx == z[4] 2651 2652 s = 0 2653 log = self._vault['account'][x]['log'] 2654 for i in log: 2655 s += log[i]['value'] 2656 if debug: 2657 print('s', s, 'z[5]', z[5]) 2658 assert s == z[5] 2659 2660 assert self.box_size(x) == z[6] 2661 assert self.log_size(x) == z[7] 2662 2663 yy = self.accounts()[y] 2664 assert self.balance(y) == z[8] 2665 assert yy == z[8] 2666 assert self.balance(y, False) == z[9] 2667 assert yy == z[9] 2668 2669 s = 0 2670 log = self._vault['account'][y]['log'] 2671 for i in log: 2672 s += log[i]['value'] 2673 assert s == z[10] 2674 2675 assert self.box_size(y) == z[11] 2676 assert self.log_size(y) == z[12] 2677 assert self.free(self.lock()) 2678 2679 if debug: 2680 pp().pprint(self.check(2.17)) 2681 2682 assert not self.nolock() 2683 history_count = len(self._vault['history']) 2684 if debug: 2685 print('history-count', history_count) 2686 assert history_count == 4 2687 assert not self.free(ZakatTracker.time()) 2688 assert self.free(self.lock()) 2689 assert self.nolock() 2690 assert len(self._vault['history']) == 3 2691 2692 # storage 2693 2694 _path = self.path(f'test.{self.ext()}') 2695 if os.path.exists(_path): 2696 os.remove(_path) 2697 self.save() 2698 assert os.path.getsize(_path) > 0 2699 self.reset() 2700 assert self.recall(False, debug) is False 2701 self.load() 2702 assert self._vault['account'] is not None 2703 2704 # recall 2705 2706 assert self.nolock() 2707 assert len(self._vault['history']) == 3 2708 assert self.recall(False, debug) is True 2709 assert len(self._vault['history']) == 2 2710 assert self.recall(False, debug) is True 2711 assert len(self._vault['history']) == 1 2712 assert self.recall(False, debug) is True 2713 assert len(self._vault['history']) == 0 2714 assert self.recall(False, debug) is False 2715 assert len(self._vault['history']) == 0 2716 2717 # exchange 2718 2719 self.exchange("cash", 25, 3.75, "2024-06-25") 2720 self.exchange("cash", 22, 3.73, "2024-06-22") 2721 self.exchange("cash", 15, 3.69, "2024-06-15") 2722 self.exchange("cash", 10, 3.66) 2723 2724 for i in range(1, 30): 2725 exchange = self.exchange("cash", i) 2726 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2727 if debug: 2728 print(i, rate, description, created) 2729 assert created 2730 if i < 10: 2731 assert rate == 1 2732 assert description is None 2733 elif i == 10: 2734 assert rate == 3.66 2735 assert description is None 2736 elif i < 15: 2737 assert rate == 3.66 2738 assert description is None 2739 elif i == 15: 2740 assert rate == 3.69 2741 assert description is not None 2742 elif i < 22: 2743 assert rate == 3.69 2744 assert description is not None 2745 elif i == 22: 2746 assert rate == 3.73 2747 assert description is not None 2748 elif i >= 25: 2749 assert rate == 3.75 2750 assert description is not None 2751 exchange = self.exchange("bank", i) 2752 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2753 if debug: 2754 print(i, rate, description, created) 2755 assert created 2756 assert rate == 1 2757 assert description is None 2758 2759 assert len(self._vault['exchange']) > 0 2760 assert len(self.exchanges()) > 0 2761 self._vault['exchange'].clear() 2762 assert len(self._vault['exchange']) == 0 2763 assert len(self.exchanges()) == 0 2764 2765 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2766 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2767 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2768 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2769 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2770 2771 for i in [x * 0.12 for x in range(-15, 21)]: 2772 if i <= 0: 2773 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2774 else: 2775 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2776 2777 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2778 for i in range(1, 31): 2779 timestamp_ns = ZakatTracker.day_to_time(i) 2780 exchange = self.exchange("cash", timestamp_ns) 2781 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2782 if debug: 2783 print(i, rate, description, created) 2784 assert created 2785 if i < 10: 2786 assert rate == 1 2787 assert description is None 2788 elif i == 10: 2789 assert rate == 3.66 2790 assert description is None 2791 elif i < 15: 2792 assert rate == 3.66 2793 assert description is None 2794 elif i == 15: 2795 assert rate == 3.69 2796 assert description is not None 2797 elif i < 22: 2798 assert rate == 3.69 2799 assert description is not None 2800 elif i == 22: 2801 assert rate == 3.73 2802 assert description is not None 2803 elif i >= 25: 2804 assert rate == 3.75 2805 assert description is not None 2806 exchange = self.exchange("bank", i) 2807 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2808 if debug: 2809 print(i, rate, description, created) 2810 assert created 2811 assert rate == 1 2812 assert description is None 2813 2814 # csv 2815 2816 csv_count = 1000 2817 2818 for with_rate, path in { 2819 False: 'test-import_csv-no-exchange', 2820 True: 'test-import_csv-with-exchange', 2821 }.items(): 2822 2823 if debug: 2824 print('test_import_csv', with_rate, path) 2825 2826 csv_path = path + '.csv' 2827 if os.path.exists(csv_path): 2828 os.remove(csv_path) 2829 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2830 if debug: 2831 print('generate_random_csv_file', c) 2832 assert c == csv_count 2833 assert os.path.getsize(csv_path) > 0 2834 cache_path = self.import_csv_cache_path() 2835 if os.path.exists(cache_path): 2836 os.remove(cache_path) 2837 self.reset() 2838 (created, found, bad) = self.import_csv(csv_path, debug) 2839 bad_count = len(bad) 2840 assert bad_count > 0 2841 if debug: 2842 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2843 print('bad', bad) 2844 tmp_size = os.path.getsize(cache_path) 2845 assert tmp_size > 0 2846 # TODO: assert created + found + bad_count == csv_count 2847 # TODO: assert created == csv_count 2848 # TODO: assert bad_count == 0 2849 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2850 bad_2_count = len(bad_2) 2851 if debug: 2852 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2853 print('bad', bad) 2854 assert bad_2_count > 0 2855 # TODO: assert tmp_size == os.path.getsize(cache_path) 2856 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2857 # TODO: assert created == found_2 2858 # TODO: assert bad_count == bad_2_count 2859 # TODO: assert found_2 == csv_count 2860 # TODO: assert bad_2_count == 0 2861 # TODO: assert created_2 == 0 2862 2863 # payment parts 2864 2865 positive_parts = self.build_payment_parts(100, positive_only=True) 2866 assert self.check_payment_parts(positive_parts) != 0 2867 assert self.check_payment_parts(positive_parts) != 0 2868 all_parts = self.build_payment_parts(300, positive_only=False) 2869 assert self.check_payment_parts(all_parts) != 0 2870 assert self.check_payment_parts(all_parts) != 0 2871 if debug: 2872 pp().pprint(positive_parts) 2873 pp().pprint(all_parts) 2874 # dynamic discount 2875 suite = [] 2876 count = 3 2877 for exceed in [False, True]: 2878 case = [] 2879 for parts in [positive_parts, all_parts]: 2880 part = parts.copy() 2881 demand = part['demand'] 2882 if debug: 2883 print(demand, part['total']) 2884 i = 0 2885 z = demand / count 2886 cp = { 2887 'account': {}, 2888 'demand': demand, 2889 'exceed': exceed, 2890 'total': part['total'], 2891 } 2892 j = '' 2893 for x, y in part['account'].items(): 2894 x_exchange = self.exchange(x) 2895 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2896 if exceed and zz <= demand: 2897 i += 1 2898 y['part'] = zz 2899 if debug: 2900 print(exceed, y) 2901 cp['account'][x] = y 2902 case.append(y) 2903 elif not exceed and y['balance'] >= zz: 2904 i += 1 2905 y['part'] = zz 2906 if debug: 2907 print(exceed, y) 2908 cp['account'][x] = y 2909 case.append(y) 2910 j = x 2911 if i >= count: 2912 break 2913 if len(cp['account'][j]) > 0: 2914 suite.append(cp) 2915 if debug: 2916 print('suite', len(suite)) 2917 # vault = self._vault.copy() 2918 for case in suite: 2919 # self._vault = vault.copy() 2920 if debug: 2921 print('case', case) 2922 result = self.check_payment_parts(case) 2923 if debug: 2924 print('check_payment_parts', result, f'exceed: {exceed}') 2925 assert result == 0 2926 2927 report = self.check(2.17, None, debug) 2928 (valid, brief, plan) = report 2929 if debug: 2930 print('valid', valid) 2931 zakat_result = self.zakat(report, parts=case, debug=debug) 2932 if debug: 2933 print('zakat-result', zakat_result) 2934 assert valid == zakat_result 2935 2936 assert self.save(path + f'.{self.ext()}') 2937 assert self.export_json(path + '.json') 2938 2939 assert self.export_json("1000-transactions-test.json") 2940 assert self.save(f"1000-transactions-test.{self.ext()}") 2941 2942 self.reset() 2943 2944 # test transfer between accounts with different exchange rate 2945 2946 a_SAR = "Bank (SAR)" 2947 b_USD = "Bank (USD)" 2948 c_SAR = "Safe (SAR)" 2949 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2950 for case in [ 2951 (0, a_SAR, "SAR Gift", 1000, 100000), 2952 (1, a_SAR, 1), 2953 (0, b_USD, "USD Gift", 500, 50000), 2954 (1, b_USD, 1), 2955 (2, b_USD, 3.75), 2956 (1, b_USD, 3.75), 2957 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2958 (0, c_SAR, "Salary", 750, 75000), 2959 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2960 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2961 ]: 2962 if debug: 2963 print('case', case) 2964 match (case[0]): 2965 case 0: # track 2966 _, account, desc, x, balance = case 2967 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2968 2969 cached_value = self.balance(account, cached=True) 2970 fresh_value = self.balance(account, cached=False) 2971 if debug: 2972 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2973 assert cached_value == balance 2974 assert fresh_value == balance 2975 case 1: # check-exchange 2976 _, account, expected_rate = case 2977 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2978 if debug: 2979 print('t-exchange', t_exchange) 2980 assert t_exchange['rate'] == expected_rate 2981 case 2: # do-exchange 2982 _, account, rate = case 2983 self.exchange(account, rate=rate, debug=debug) 2984 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2985 if debug: 2986 print('b-exchange', b_exchange) 2987 assert b_exchange['rate'] == rate 2988 case 3: # transfer 2989 _, x, a, b, desc, a_balance, b_balance = case 2990 self.transfer(x, a, b, desc, debug=debug) 2991 2992 cached_value = self.balance(a, cached=True) 2993 fresh_value = self.balance(a, cached=False) 2994 if debug: 2995 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 2996 assert cached_value == a_balance 2997 assert fresh_value == a_balance 2998 2999 cached_value = self.balance(b, cached=True) 3000 fresh_value = self.balance(b, cached=False) 3001 if debug: 3002 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3003 assert cached_value == b_balance 3004 assert fresh_value == b_balance 3005 3006 # Transfer all in many chunks randomly from B to A 3007 a_SAR_balance = 137125 3008 b_USD_balance = 50100 3009 b_USD_exchange = self.exchange(b_USD) 3010 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3011 if debug: 3012 print('amounts', amounts) 3013 i = 0 3014 for x in amounts: 3015 if debug: 3016 print(f'{i} - transfer-with-exchange({x})') 3017 self.transfer( 3018 unscaled_amount=self.unscale(x), 3019 from_account=b_USD, 3020 to_account=a_SAR, 3021 desc=f"{x} USD -> SAR", 3022 debug=debug, 3023 ) 3024 3025 b_USD_balance -= x 3026 cached_value = self.balance(b_USD, cached=True) 3027 fresh_value = self.balance(b_USD, cached=False) 3028 if debug: 3029 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3030 b_USD_balance) 3031 assert cached_value == b_USD_balance 3032 assert fresh_value == b_USD_balance 3033 3034 a_SAR_balance += int(x * b_USD_exchange['rate']) 3035 cached_value = self.balance(a_SAR, cached=True) 3036 fresh_value = self.balance(a_SAR, cached=False) 3037 if debug: 3038 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3039 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3040 assert cached_value == a_SAR_balance 3041 assert fresh_value == a_SAR_balance 3042 i += 1 3043 3044 # Transfer all in many chunks randomly from C to A 3045 c_SAR_balance = 37500 3046 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3047 if debug: 3048 print('amounts', amounts) 3049 i = 0 3050 for x in amounts: 3051 if debug: 3052 print(f'{i} - transfer-with-exchange({x})') 3053 self.transfer( 3054 unscaled_amount=self.unscale(x), 3055 from_account=c_SAR, 3056 to_account=a_SAR, 3057 desc=f"{x} SAR -> a_SAR", 3058 debug=debug, 3059 ) 3060 3061 c_SAR_balance -= x 3062 cached_value = self.balance(c_SAR, cached=True) 3063 fresh_value = self.balance(c_SAR, cached=False) 3064 if debug: 3065 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3066 c_SAR_balance) 3067 assert cached_value == c_SAR_balance 3068 assert fresh_value == c_SAR_balance 3069 3070 a_SAR_balance += x 3071 cached_value = self.balance(a_SAR, cached=True) 3072 fresh_value = self.balance(a_SAR, cached=False) 3073 if debug: 3074 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3075 a_SAR_balance) 3076 assert cached_value == a_SAR_balance 3077 assert fresh_value == a_SAR_balance 3078 i += 1 3079 3080 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3081 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3082 3083 # check & zakat with exchange rates for many cycles 3084 3085 for rate, values in { 3086 1: { 3087 'in': [1000, 2000, 10000], 3088 'exchanged': [100000, 200000, 1000000], 3089 'out': [2500, 5000, 73140], 3090 }, 3091 3.75: { 3092 'in': [200, 1000, 5000], 3093 'exchanged': [75000, 375000, 1875000], 3094 'out': [1875, 9375, 137138], 3095 }, 3096 }.items(): 3097 a, b, c = values['in'] 3098 m, n, o = values['exchanged'] 3099 x, y, z = values['out'] 3100 if debug: 3101 print('rate', rate, 'values', values) 3102 for case in [ 3103 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3104 {'safe': {0: {'below_nisab': x}}}, 3105 ], False, m), 3106 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3107 {'safe': {0: {'count': 1, 'total': y}}}, 3108 ], True, n), 3109 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3110 {'cave': {0: {'count': 3, 'total': z}}}, 3111 ], True, o), 3112 ]: 3113 if debug: 3114 print(f"############# check(rate: {rate}) #############") 3115 print('case', case) 3116 self.reset() 3117 self.exchange(account=case[1], created=case[2], rate=rate) 3118 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3119 assert self.snapshot() 3120 3121 # assert self.nolock() 3122 # history_size = len(self._vault['history']) 3123 # print('history_size', history_size) 3124 # assert history_size == 2 3125 assert self.lock() 3126 assert not self.nolock() 3127 report = self.check(2.17, None, debug) 3128 (valid, brief, plan) = report 3129 if debug: 3130 print('brief', brief) 3131 assert valid == case[4] 3132 assert case[5] == brief[0] 3133 assert case[5] == brief[1] 3134 3135 if debug: 3136 pp().pprint(plan) 3137 3138 for x in plan: 3139 assert case[1] == x 3140 if 'total' in case[3][0][x][0].keys(): 3141 assert case[3][0][x][0]['total'] == int(brief[2]) 3142 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3143 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3144 else: 3145 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3146 if debug: 3147 pp().pprint(report) 3148 result = self.zakat(report, debug=debug) 3149 if debug: 3150 print('zakat-result', result, case[4]) 3151 assert result == case[4] 3152 report = self.check(2.17, None, debug) 3153 (valid, brief, plan) = report 3154 assert valid is False 3155 3156 history_size = len(self._vault['history']) 3157 if debug: 3158 print('history_size', history_size) 3159 assert history_size == 3 3160 assert not self.nolock() 3161 assert self.recall(False, debug) is False 3162 self.free(self.lock()) 3163 assert self.nolock() 3164 3165 for i in range(3, 0, -1): 3166 history_size = len(self._vault['history']) 3167 if debug: 3168 print('history_size', history_size) 3169 assert history_size == i 3170 assert self.recall(False, debug) is True 3171 3172 assert self.nolock() 3173 assert self.recall(False, debug) is False 3174 3175 history_size = len(self._vault['history']) 3176 if debug: 3177 print('history_size', history_size) 3178 assert history_size == 0 3179 3180 account_size = len(self._vault['account']) 3181 if debug: 3182 print('account_size', account_size) 3183 assert account_size == 0 3184 3185 report_size = len(self._vault['report']) 3186 if debug: 3187 print('report_size', report_size) 3188 assert report_size == 0 3189 3190 assert self.nolock() 3191 return True 3192 except: 3193 # pp().pprint(self._vault) 3194 assert self.export_json("test-snapshot.json") 3195 assert self.save(f"test-snapshot.{self.ext()}") 3196 raise
A class for tracking and calculating Zakat.
This class provides functionalities for recording transactions, calculating Zakat due, and managing account balances. It also offers features like importing transactions from CSV files, exporting data to JSON format, and saving/loading the tracker state.
The ZakatTracker
class is designed to handle both positive and negative transactions,
allowing for flexible tracking of financial activities related to Zakat. It also supports
the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due
based on the current silver price.
The class uses a camel file as its database to persist the tracker state, ensuring data integrity across sessions. It also provides options for enabling or disabling history tracking, allowing users to choose their preferred level of detail.
In addition, the ZakatTracker
class includes various helper methods like
time
, time_to_datetime
, lock
, free
, recall
, export_json
,
and more. These methods provide additional functionalities and flexibility
for interacting with and managing the Zakat tracker.
Attributes: ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. ZakatTracker.Version (function): The version of the ZakatTracker class.
Data Structure: The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data.
_vault (dict):
- account (dict):
- {account_number} (dict):
- balance (int): The current balance of the account.
- box (dict): A dictionary storing transaction details.
- {timestamp} (dict):
- capital (int): The initial amount of the transaction.
- count (int): The number of times Zakat has been calculated for this transaction.
- last (int): The timestamp of the last Zakat calculation.
- rest (int): The remaining amount after Zakat deductions and withdrawal.
- total (int): The total Zakat deducted from this transaction.
- count (int): The total number of transactions for the account.
- log (dict): A dictionary storing transaction logs.
- {timestamp} (dict):
- value (int): The transaction amount (positive or negative).
- desc (str): The description of the transaction.
- ref (int): The box reference (positive or None).
- file (dict): A dictionary storing file references associated with the transaction.
- hide (bool): Indicates whether the account is hidden or not.
- zakatable (bool): Indicates whether the account is subject to Zakat.
- exchange (dict):
- account (dict):
- {timestamps} (dict):
- rate (float): Exchange rate when compared to local currency.
- description (str): The description of the exchange rate.
- history (dict):
- {timestamp} (list): A list of dictionaries storing the history of actions performed.
- {action_dict} (dict):
- action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT).
- account (str): The account number associated with the action.
- ref (int): The reference number of the transaction.
- file (int): The reference number of the file (if applicable).
- key (str): The key associated with the action (e.g., 'rest', 'total').
- value (int): The value associated with the action.
- math (MathOperation): The mathematical operation performed (if applicable).
- lock (int or None): The timestamp indicating the current lock status (None if not locked).
- report (dict):
- {timestamp} (tuple): A tuple storing Zakat report details.
271 def __init__(self, db_path: str = "zakat.camel", history_mode: bool = True): 272 """ 273 Initialize ZakatTracker with database path and history mode. 274 275 Parameters: 276 db_path (str): The path to the database file. Default is "zakat.camel". 277 history_mode (bool): The mode for tracking history. Default is True. 278 279 Returns: 280 None 281 """ 282 self._base_path = None 283 self._vault_path = None 284 self._vault = None 285 self.reset() 286 self._history(history_mode) 287 self.path(db_path) 288 self.load()
Initialize ZakatTracker with database path and history mode.
Parameters: db_path (str): The path to the database file. Default is "zakat.camel". history_mode (bool): The mode for tracking history. Default is True.
Returns: None
195 @staticmethod 196 def Version() -> str: 197 """ 198 Returns the current version of the software. 199 200 This function returns a string representing the current version of the software, 201 including major, minor, and patch version numbers in the format "X.Y.Z". 202 203 Returns: 204 str: The current version of the software. 205 """ 206 return '0.2.82'
Returns the current version of the software.
This function returns a string representing the current version of the software, including major, minor, and patch version numbers in the format "X.Y.Z".
Returns: str: The current version of the software.
208 @staticmethod 209 def ZakatCut(x: float) -> float: 210 """ 211 Calculates the Zakat amount due on an asset. 212 213 This function calculates the zakat amount due on a given asset value over one lunar year. 214 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 215 that exceeds a certain threshold (Nisab). 216 217 Parameters: 218 x: The total value of the asset on which Zakat is to be calculated. 219 220 Returns: 221 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 222 """ 223 return 0.025 * x # Zakat Cut in one Lunar Year
Calculates the Zakat amount due on an asset.
This function calculates the zakat amount due on a given asset value over one lunar year. Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth that exceeds a certain threshold (Nisab).
Parameters: x: The total value of the asset on which Zakat is to be calculated.
Returns: The amount of Zakat due on the asset, calculated as 2.5% of the asset's value.
225 @staticmethod 226 def TimeCycle(days: int = 355) -> int: 227 """ 228 Calculates the approximate duration of a lunar year in nanoseconds. 229 230 This function calculates the approximate duration of a lunar year based on the given number of days. 231 It converts the given number of days into nanoseconds for use in high-precision timing applications. 232 233 Parameters: 234 days: The number of days in a lunar year. Defaults to 355, 235 which is an approximation of the average length of a lunar year. 236 237 Returns: 238 The approximate duration of a lunar year in nanoseconds. 239 """ 240 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds
Calculates the approximate duration of a lunar year in nanoseconds.
This function calculates the approximate duration of a lunar year based on the given number of days. It converts the given number of days into nanoseconds for use in high-precision timing applications.
Parameters: days: The number of days in a lunar year. Defaults to 355, which is an approximation of the average length of a lunar year.
Returns: The approximate duration of a lunar year in nanoseconds.
242 @staticmethod 243 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 244 """ 245 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 246 247 This function calculates the Nisab value, which is the minimum threshold of wealth, 248 that makes an individual liable for paying Zakat. 249 The Nisab value is determined by the equivalent value of a specific amount 250 of gold or silver (currently 595 grams in silver) in the local currency. 251 252 Parameters: 253 - gram_price (float): The price per gram of Nisab. 254 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 255 256 Returns: 257 - float: The total value of Nisab based on the given price per gram. 258 """ 259 return gram_price * gram_quantity
Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram.
This function calculates the Nisab value, which is the minimum threshold of wealth, that makes an individual liable for paying Zakat. The Nisab value is determined by the equivalent value of a specific amount of gold or silver (currently 595 grams in silver) in the local currency.
Parameters:
- gram_price (float): The price per gram of Nisab.
- gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver.
Returns:
- float: The total value of Nisab based on the given price per gram.
261 @staticmethod 262 def ext() -> str: 263 """ 264 Returns the file extension used by the ZakatTracker class. 265 266 Returns: 267 str: The file extension used by the ZakatTracker class, which is 'camel'. 268 """ 269 return 'camel'
Returns the file extension used by the ZakatTracker class.
Returns: str: The file extension used by the ZakatTracker class, which is 'camel'.
290 def path(self, path: str = None) -> str: 291 """ 292 Set or get the path to the database file. 293 294 If no path is provided, the current path is returned. 295 If a path is provided, it is set as the new path. 296 The function also creates the necessary directories if the provided path is a file. 297 298 Parameters: 299 path (str): The new path to the database file. If not provided, the current path is returned. 300 301 Returns: 302 str: The current or new path to the database file. 303 """ 304 if path is None: 305 return self._vault_path 306 self._vault_path = Path(path).resolve() 307 base_path = Path(path).resolve() 308 if base_path.is_file() or base_path.suffix: 309 base_path = base_path.parent 310 base_path.mkdir(parents=True, exist_ok=True) 311 self._base_path = base_path 312 return str(self._vault_path)
Set or get the path to the database file.
If no path is provided, the current path is returned. If a path is provided, it is set as the new path. The function also creates the necessary directories if the provided path is a file.
Parameters: path (str): The new path to the database file. If not provided, the current path is returned.
Returns: str: The current or new path to the database file.
314 def base_path(self, *args) -> str: 315 """ 316 Generate a base path by joining the provided arguments with the existing base path. 317 318 Parameters: 319 *args (str): Variable length argument list of strings to be joined with the base path. 320 321 Returns: 322 str: The generated base path. If no arguments are provided, the existing base path is returned. 323 """ 324 if not args: 325 return str(self._base_path) 326 filtered_args = [] 327 ignored_filename = None 328 for arg in args: 329 if Path(arg).suffix: 330 ignored_filename = arg 331 else: 332 filtered_args.append(arg) 333 base_path = Path(self._base_path) 334 full_path = base_path.joinpath(*filtered_args) 335 full_path.mkdir(parents=True, exist_ok=True) 336 if ignored_filename is not None: 337 return full_path.resolve() / ignored_filename # Join with the ignored filename 338 return str(full_path.resolve())
Generate a base path by joining the provided arguments with the existing base path.
Parameters: *args (str): Variable length argument list of strings to be joined with the base path.
Returns: str: The generated base path. If no arguments are provided, the existing base path is returned.
340 @staticmethod 341 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 342 """ 343 Scales a numerical value by a specified power of 10, returning an integer. 344 345 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 346 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 347 348 Parameters: 349 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 350 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 351 by a factor of 100 (e.g., converts 1.23 to 123). 352 353 Returns: 354 The scaled value, rounded to the nearest integer. 355 356 Raises: 357 TypeError: If the input `x` is not a valid numeric type. 358 359 Examples: 360 >>> ZakatTracker.scale(3.14159) 361 314 362 >>> ZakatTracker.scale(1234, decimal_places=3) 363 1234000 364 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 365 50 366 """ 367 if not isinstance(x, (float, int, Decimal)): 368 raise TypeError("Input 'x' must be a float, int, or Decimal.") 369 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places))
Scales a numerical value by a specified power of 10, returning an integer.
This function is designed to handle various numeric types (float
, int
, or Decimal
) and
facilitate precise scaling operations, particularly useful in financial or scientific calculations.
Parameters: x: The numeric value to scale. Can be a floating-point number, integer, or decimal. decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled by a factor of 100 (e.g., converts 1.23 to 123).
Returns: The scaled value, rounded to the nearest integer.
Raises:
TypeError: If the input x
is not a valid numeric type.
Examples:
>>> ZakatTracker.scale(3.14159)
314
>>> ZakatTracker.scale(1234, decimal_places=3)
1234000
>>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4)
50
371 @staticmethod 372 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 373 """ 374 Unscales an integer by a power of 10. 375 376 Parameters: 377 x: The integer to unscale. 378 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 379 decimal_places: The power of 10 to use. Defaults to 2. 380 381 Returns: 382 The unscaled number, converted to the specified return_type. 383 384 Raises: 385 TypeError: If the return_type is not float or Decimal. 386 """ 387 if return_type not in (float, Decimal): 388 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 389 return round(return_type(x / (10 ** decimal_places)), decimal_places)
Unscales an integer by a power of 10.
Parameters: x: The integer to unscale. return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. decimal_places: The power of 10 to use. Defaults to 2.
Returns: The unscaled number, converted to the specified return_type.
Raises: TypeError: If the return_type is not float or Decimal.
405 def reset(self) -> None: 406 """ 407 Reset the internal data structure to its initial state. 408 409 Parameters: 410 None 411 412 Returns: 413 None 414 """ 415 self._vault = { 416 'account': {}, 417 'exchange': {}, 418 'history': {}, 419 'lock': None, 420 'report': {}, 421 }
Reset the internal data structure to its initial state.
Parameters: None
Returns: None
423 @staticmethod 424 def time(now: datetime = None) -> int: 425 """ 426 Generates a timestamp based on the provided datetime object or the current datetime. 427 428 Parameters: 429 now (datetime, optional): The datetime object to generate the timestamp from. 430 If not provided, the current datetime is used. 431 432 Returns: 433 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 434 before 1970 will return in negative until 1000AD. 435 """ 436 if now is None: 437 now = datetime.datetime.now() 438 ordinal_day = now.toordinal() 439 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 440 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day)
Generates a timestamp based on the provided datetime object or the current datetime.
Parameters: now (datetime, optional): The datetime object to generate the timestamp from. If not provided, the current datetime is used.
Returns: int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), before 1970 will return in negative until 1000AD.
442 @staticmethod 443 def time_to_datetime(ordinal_ns: int) -> datetime: 444 """ 445 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 446 447 Parameters: 448 ordinal_ns (int): The ordinal number of days since 1000-01-01. 449 450 Returns: 451 datetime: The corresponding datetime object. 452 """ 453 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 454 ns_in_day = ordinal_ns % 86_400_000_000_000 455 d = datetime.datetime.fromordinal(ordinal_day) 456 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 457 return datetime.datetime.combine(d, datetime.time()) + t
Converts an ordinal number (number of days since 1000-01-01) to a datetime object.
Parameters: ordinal_ns (int): The ordinal number of days since 1000-01-01.
Returns: datetime: The corresponding datetime object.
459 def clean_history(self, lock: int | None = None) -> int: 460 """ 461 Cleans up the history of actions performed on the ZakatTracker instance. 462 463 Parameters: 464 lock (int, optional): The lock ID is used to clean up the empty history. 465 If not provided, it cleans up the empty history records for all locks. 466 467 Returns: 468 int: The number of locks cleaned up. 469 """ 470 count = 0 471 if lock in self._vault['history']: 472 if len(self._vault['history'][lock]) <= 0: 473 count += 1 474 del self._vault['history'][lock] 475 return count 476 self.free(self.lock()) 477 for lock in self._vault['history']: 478 if len(self._vault['history'][lock]) <= 0: 479 count += 1 480 del self._vault['history'][lock] 481 return count
Cleans up the history of actions performed on the ZakatTracker instance.
Parameters: lock (int, optional): The lock ID is used to clean up the empty history. If not provided, it cleans up the empty history records for all locks.
Returns: int: The number of locks cleaned up.
519 def nolock(self) -> bool: 520 """ 521 Check if the vault lock is currently not set. 522 523 Returns: 524 bool: True if the vault lock is not set, False otherwise. 525 """ 526 return self._vault['lock'] is None
Check if the vault lock is currently not set.
Returns: bool: True if the vault lock is not set, False otherwise.
528 def lock(self) -> int: 529 """ 530 Acquires a lock on the ZakatTracker instance. 531 532 Returns: 533 int: The lock ID. This ID can be used to release the lock later. 534 """ 535 return self._step()
Acquires a lock on the ZakatTracker instance.
Returns: int: The lock ID. This ID can be used to release the lock later.
537 def vault(self) -> dict: 538 """ 539 Returns a copy of the internal vault dictionary. 540 541 This method is used to retrieve the current state of the ZakatTracker object. 542 It provides a snapshot of the internal data structure, allowing for further 543 processing or analysis. 544 545 Returns: 546 dict: A copy of the internal vault dictionary. 547 """ 548 return self._vault.copy()
Returns a copy of the internal vault dictionary.
This method is used to retrieve the current state of the ZakatTracker object. It provides a snapshot of the internal data structure, allowing for further processing or analysis.
Returns: dict: A copy of the internal vault dictionary.
550 def stats(self) -> dict[str, tuple]: 551 """ 552 Calculates and returns statistics about the object's data storage. 553 554 This method determines the size of the database file on disk and the 555 size of the data currently held in RAM (likely within a dictionary). 556 Both sizes are reported in bytes and in a human-readable format 557 (e.g., KB, MB). 558 559 Returns: 560 dict[str, tuple]: A dictionary containing the following statistics: 561 562 * 'database': A tuple with two elements: 563 - The database file size in bytes (int). 564 - The database file size in human-readable format (str). 565 * 'ram': A tuple with two elements: 566 - The RAM usage (dictionary size) in bytes (int). 567 - The RAM usage in human-readable format (str). 568 569 Example: 570 >>> stats = my_object.stats() 571 >>> print(stats['database']) 572 (256000, '250.0 KB') 573 >>> print(stats['ram']) 574 (12345, '12.1 KB') 575 """ 576 ram_size = self.get_dict_size(self.vault()) 577 file_size = os.path.getsize(self.path()) 578 return { 579 'database': (file_size, self.human_readable_size(file_size)), 580 'ram': (ram_size, self.human_readable_size(ram_size)), 581 }
Calculates and returns statistics about the object's data storage.
This method determines the size of the database file on disk and the size of the data currently held in RAM (likely within a dictionary). Both sizes are reported in bytes and in a human-readable format (e.g., KB, MB).
Returns: dict[str, tuple]: A dictionary containing the following statistics:
* 'database': A tuple with two elements:
- The database file size in bytes (int).
- The database file size in human-readable format (str).
* 'ram': A tuple with two elements:
- The RAM usage (dictionary size) in bytes (int).
- The RAM usage in human-readable format (str).
Example:
>>> stats = my_object.stats()
>>> print(stats['database'])
(256000, '250.0 KB')
>>> print(stats['ram'])
(12345, '12.1 KB')
583 def files(self) -> list[dict[str, str | int]]: 584 """ 585 Retrieves information about files associated with this class. 586 587 This class method provides a standardized way to gather details about 588 files used by the class for storage, snapshots, and CSV imports. 589 590 Returns: 591 list[dict[str, str | int]]: A list of dictionaries, each containing information 592 about a specific file: 593 594 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 595 * path (str): The full file path. 596 * exists (bool): Whether the file exists on the filesystem. 597 * size (int): The file size in bytes (0 if the file doesn't exist). 598 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 599 600 Example: 601 ``` 602 file_info = MyClass.files() 603 for info in file_info: 604 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 605 ``` 606 """ 607 result = [] 608 for file_type, path in { 609 'database': self.path(), 610 'snapshot': self.snapshot_cache_path(), 611 'import_csv': self.import_csv_cache_path(), 612 }.items(): 613 exists = os.path.exists(path) 614 size = os.path.getsize(path) if exists else 0 615 human_readable_size = self.human_readable_size(size) if exists else 0 616 result.append({ 617 'type': file_type, 618 'path': path, 619 'exists': exists, 620 'size': size, 621 'human_readable_size': human_readable_size, 622 }) 623 return result
Retrieves information about files associated with this class.
This class method provides a standardized way to gather details about files used by the class for storage, snapshots, and CSV imports.
Returns: list[dict[str, str | int]]: A list of dictionaries, each containing information about a specific file:
* type (str): The type of file ('database', 'snapshot', 'import_csv').
* path (str): The full file path.
* exists (bool): Whether the file exists on the filesystem.
* size (int): The file size in bytes (0 if the file doesn't exist).
* human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB').
Example:
file_info = MyClass.files()
for info in file_info:
print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}")
625 def steps(self) -> dict: 626 """ 627 Returns a copy of the history of steps taken in the ZakatTracker. 628 629 The history is a dictionary where each key is a unique identifier for a step, 630 and the corresponding value is a dictionary containing information about the step. 631 632 Returns: 633 dict: A copy of the history of steps taken in the ZakatTracker. 634 """ 635 return self._vault['history'].copy()
Returns a copy of the history of steps taken in the ZakatTracker.
The history is a dictionary where each key is a unique identifier for a step, and the corresponding value is a dictionary containing information about the step.
Returns: dict: A copy of the history of steps taken in the ZakatTracker.
637 def free(self, lock: int, auto_save: bool = True) -> bool: 638 """ 639 Releases the lock on the database. 640 641 Parameters: 642 lock (int): The lock ID to be released. 643 auto_save (bool): Whether to automatically save the database after releasing the lock. 644 645 Returns: 646 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 647 """ 648 if lock == self._vault['lock']: 649 self._vault['lock'] = None 650 self.clean_history(lock) 651 if auto_save: 652 return self.save(self.path()) 653 return True 654 return False
Releases the lock on the database.
Parameters: lock (int): The lock ID to be released. auto_save (bool): Whether to automatically save the database after releasing the lock.
Returns: bool: True if the lock is successfully released and (optionally) saved, False otherwise.
656 def account_exists(self, account) -> bool: 657 """ 658 Check if the given account exists in the vault. 659 660 Parameters: 661 account (str): The account number to check. 662 663 Returns: 664 bool: True if the account exists, False otherwise. 665 """ 666 return account in self._vault['account']
Check if the given account exists in the vault.
Parameters: account (str): The account number to check.
Returns: bool: True if the account exists, False otherwise.
668 def box_size(self, account) -> int: 669 """ 670 Calculate the size of the box for a specific account. 671 672 Parameters: 673 account (str): The account number for which the box size needs to be calculated. 674 675 Returns: 676 int: The size of the box for the given account. If the account does not exist, -1 is returned. 677 """ 678 if self.account_exists(account): 679 return len(self._vault['account'][account]['box']) 680 return -1
Calculate the size of the box for a specific account.
Parameters: account (str): The account number for which the box size needs to be calculated.
Returns: int: The size of the box for the given account. If the account does not exist, -1 is returned.
682 def log_size(self, account) -> int: 683 """ 684 Get the size of the log for a specific account. 685 686 Parameters: 687 account (str): The account number for which the log size needs to be calculated. 688 689 Returns: 690 int: The size of the log for the given account. If the account does not exist, -1 is returned. 691 """ 692 if self.account_exists(account): 693 return len(self._vault['account'][account]['log']) 694 return -1
Get the size of the log for a specific account.
Parameters: account (str): The account number for which the log size needs to be calculated.
Returns: int: The size of the log for the given account. If the account does not exist, -1 is returned.
696 @staticmethod 697 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 698 """ 699 Calculates the hash of a file using the specified algorithm. 700 701 Parameters: 702 file_path (str): The path to the file. 703 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 704 705 Returns: 706 str: The hexadecimal representation of the file's hash. 707 """ 708 hash_obj = hashlib.new(algorithm) # Create the hash object 709 with open(file_path, "rb") as f: # Open file in binary mode for reading 710 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 711 hash_obj.update(chunk) 712 return hash_obj.hexdigest() # Return the hash as a hexadecimal string
Calculates the hash of a file using the specified algorithm.
Parameters: file_path (str): The path to the file. algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b".
Returns: str: The hexadecimal representation of the file's hash.
714 def snapshot_cache_path(self): 715 """ 716 Generate the path for the cache file used to store snapshots. 717 718 The cache file is a camel file that stores the timestamps of the snapshots. 719 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 720 721 Returns: 722 str: The path to the cache file. 723 """ 724 path = str(self.path()) 725 ext = self.ext() 726 ext_len = len(ext) 727 if path.endswith(f'.{ext}'): 728 path = path[:-ext_len-1] 729 return path + f'.snapshots.{ext}'
Generate the path for the cache file used to store snapshots.
The cache file is a camel file that stores the timestamps of the snapshots. The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel".
Returns: str: The path to the cache file.
731 def snapshot(self) -> bool: 732 """ 733 This function creates a snapshot of the current database state. 734 735 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 736 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 737 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 738 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 739 740 Parameters: 741 None 742 743 Returns: 744 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 745 """ 746 current_hash = self.file_hash(self.path()) 747 cache: dict[str, int] = {} # hash: time_ns 748 try: 749 with open(self.snapshot_cache_path(), 'r') as stream: 750 cache = camel.load(stream.read()) 751 except: 752 pass 753 if current_hash in cache: 754 return True 755 time = time_ns() 756 cache[current_hash] = time 757 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 758 return False 759 with open(self.snapshot_cache_path(), 'w') as stream: 760 stream.write(camel.dump(cache)) 761 return True
This function creates a snapshot of the current database state.
The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. If a snapshot with the same hash exists, the function returns True without creating a new snapshot. If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp.
Parameters: None
Returns: bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails.
763 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 764 -> dict[int, tuple[str, str, bool]]: 765 """ 766 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 767 768 Parameters: 769 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 770 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 771 772 Returns: 773 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 774 and the values are tuples containing the snapshot's hash, path, and existence status. 775 """ 776 cache: dict[str, int] = {} # hash: time_ns 777 try: 778 with open(self.snapshot_cache_path(), 'r') as stream: 779 cache = camel.load(stream.read()) 780 except: 781 pass 782 if not cache: 783 return {} 784 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 785 for file_hash, ref in cache.items(): 786 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 787 exists = os.path.exists(path) 788 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 789 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 790 continue 791 if exists or not hide_missing: 792 result[ref] = (file_hash, path, exists) 793 return result
Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status.
Parameters:
- hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True.
- verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False.
Returns:
- dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, and the values are tuples containing the snapshot's hash, path, and existence status.
795 def recall(self, dry=True, debug=False) -> bool: 796 """ 797 Revert the last operation. 798 799 Parameters: 800 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 801 debug (bool): If True, the function will print debug information. Default is False. 802 803 Returns: 804 bool: True if the operation was successful, False otherwise. 805 """ 806 if not self.nolock() or len(self._vault['history']) == 0: 807 return False 808 if len(self._vault['history']) <= 0: 809 return False 810 ref = sorted(self._vault['history'].keys())[-1] 811 if debug: 812 print('recall', ref) 813 memory = self._vault['history'][ref] 814 if debug: 815 print(type(memory), 'memory', memory) 816 limit = len(memory) + 1 817 sub_positive_log_negative = 0 818 for i in range(-1, -limit, -1): 819 x = memory[i] 820 if debug: 821 print(type(x), x) 822 match x['action']: 823 case Action.CREATE: 824 if x['account'] is not None: 825 if self.account_exists(x['account']): 826 if debug: 827 print('account', self._vault['account'][x['account']]) 828 assert len(self._vault['account'][x['account']]['box']) == 0 829 assert self._vault['account'][x['account']]['balance'] == 0 830 assert self._vault['account'][x['account']]['count'] == 0 831 if dry: 832 continue 833 del self._vault['account'][x['account']] 834 835 case Action.TRACK: 836 if x['account'] is not None: 837 if self.account_exists(x['account']): 838 if dry: 839 continue 840 self._vault['account'][x['account']]['balance'] -= x['value'] 841 self._vault['account'][x['account']]['count'] -= 1 842 del self._vault['account'][x['account']]['box'][x['ref']] 843 844 case Action.LOG: 845 if x['account'] is not None: 846 if self.account_exists(x['account']): 847 if x['ref'] in self._vault['account'][x['account']]['log']: 848 if dry: 849 continue 850 if sub_positive_log_negative == -x['value']: 851 self._vault['account'][x['account']]['count'] -= 1 852 sub_positive_log_negative = 0 853 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 854 if not box_ref is None: 855 assert self.box_exists(x['account'], box_ref) 856 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 857 assert box_value < 0 858 859 try: 860 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 861 except TypeError: 862 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 863 -box_value) 864 865 try: 866 self._vault['account'][x['account']]['balance'] += -box_value 867 except TypeError: 868 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 869 870 self._vault['account'][x['account']]['count'] -= 1 871 del self._vault['account'][x['account']]['log'][x['ref']] 872 873 case Action.SUB: 874 if x['account'] is not None: 875 if self.account_exists(x['account']): 876 if x['ref'] in self._vault['account'][x['account']]['box']: 877 if dry: 878 continue 879 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 880 self._vault['account'][x['account']]['balance'] += x['value'] 881 sub_positive_log_negative = x['value'] 882 883 case Action.ADD_FILE: 884 if x['account'] is not None: 885 if self.account_exists(x['account']): 886 if x['ref'] in self._vault['account'][x['account']]['log']: 887 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 888 if dry: 889 continue 890 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 891 892 case Action.REMOVE_FILE: 893 if x['account'] is not None: 894 if self.account_exists(x['account']): 895 if x['ref'] in self._vault['account'][x['account']]['log']: 896 if dry: 897 continue 898 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 899 900 case Action.BOX_TRANSFER: 901 if x['account'] is not None: 902 if self.account_exists(x['account']): 903 if x['ref'] in self._vault['account'][x['account']]['box']: 904 if dry: 905 continue 906 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 907 908 case Action.EXCHANGE: 909 if x['account'] is not None: 910 if x['account'] in self._vault['exchange']: 911 if x['ref'] in self._vault['exchange'][x['account']]: 912 if dry: 913 continue 914 del self._vault['exchange'][x['account']][x['ref']] 915 916 case Action.REPORT: 917 if x['ref'] in self._vault['report']: 918 if dry: 919 continue 920 del self._vault['report'][x['ref']] 921 922 case Action.ZAKAT: 923 if x['account'] is not None: 924 if self.account_exists(x['account']): 925 if x['ref'] in self._vault['account'][x['account']]['box']: 926 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 927 if dry: 928 continue 929 match x['math']: 930 case MathOperation.ADDITION: 931 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 932 'value'] 933 case MathOperation.EQUAL: 934 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 935 case MathOperation.SUBTRACTION: 936 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 937 'value'] 938 939 if not dry: 940 del self._vault['history'][ref] 941 return True
Revert the last operation.
Parameters: dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. debug (bool): If True, the function will print debug information. Default is False.
Returns: bool: True if the operation was successful, False otherwise.
943 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 944 """ 945 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 946 947 Parameters: 948 account (str): The account number for which to check the existence of the reference. 949 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 950 ref (int): The reference (transaction) number to check for existence. 951 952 Returns: 953 bool: True if the reference exists for the given account and reference type, False otherwise. 954 """ 955 if account in self._vault['account']: 956 return ref in self._vault['account'][account][ref_type] 957 return False
Check if a specific reference (transaction) exists in the vault for a given account and reference type.
Parameters: account (str): The account number for which to check the existence of the reference. ref_type (str): The type of reference (e.g., 'box', 'log', etc.). ref (int): The reference (transaction) number to check for existence.
Returns: bool: True if the reference exists for the given account and reference type, False otherwise.
959 def box_exists(self, account: str, ref: int) -> bool: 960 """ 961 Check if a specific box (transaction) exists in the vault for a given account and reference. 962 963 Parameters: 964 - account (str): The account number for which to check the existence of the box. 965 - ref (int): The reference (transaction) number to check for existence. 966 967 Returns: 968 - bool: True if the box exists for the given account and reference, False otherwise. 969 """ 970 return self.ref_exists(account, 'box', ref)
Check if a specific box (transaction) exists in the vault for a given account and reference.
Parameters:
- account (str): The account number for which to check the existence of the box.
- ref (int): The reference (transaction) number to check for existence.
Returns:
- bool: True if the box exists for the given account and reference, False otherwise.
972 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 973 created: int = None, 974 debug: bool = False) -> int: 975 """ 976 This function tracks a transaction for a specific account. 977 978 Parameters: 979 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 980 desc (str): The description of the transaction. Default is an empty string. 981 account (str): The account for which the transaction is being tracked. Default is '1'. 982 logging (bool): Whether to log the transaction. Default is True. 983 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 984 debug (bool): Whether to print debug information. Default is False. 985 986 Returns: 987 int: The timestamp of the transaction. 988 989 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 990 991 Raises: 992 ValueError: The log transaction happened again in the same nanosecond time. 993 ValueError: The box transaction happened again in the same nanosecond time. 994 """ 995 if debug: 996 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 997 if created is None: 998 created = self.time() 999 no_lock = self.nolock() 1000 self.lock() 1001 if not self.account_exists(account): 1002 if debug: 1003 print(f"account {account} created") 1004 self._vault['account'][account] = { 1005 'balance': 0, 1006 'box': {}, 1007 'count': 0, 1008 'log': {}, 1009 'hide': False, 1010 'zakatable': True, 1011 } 1012 self._step(Action.CREATE, account) 1013 if unscaled_value == 0: 1014 if no_lock: 1015 self.free(self.lock()) 1016 return 0 1017 value = self.scale(unscaled_value) 1018 if logging: 1019 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1020 if debug: 1021 print('create-box', created) 1022 if self.box_exists(account, created): 1023 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1024 if debug: 1025 print('created-box', created) 1026 self._vault['account'][account]['box'][created] = { 1027 'capital': value, 1028 'count': 0, 1029 'last': 0, 1030 'rest': value, 1031 'total': 0, 1032 } 1033 self._step(Action.TRACK, account, ref=created, value=value) 1034 if no_lock: 1035 self.free(self.lock()) 1036 return created
This function tracks a transaction for a specific account.
Parameters: unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. desc (str): The description of the transaction. Default is an empty string. account (str): The account for which the transaction is being tracked. Default is '1'. logging (bool): Whether to log the transaction. Default is True. created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. debug (bool): Whether to print debug information. Default is False.
Returns: int: The timestamp of the transaction.
This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box.
Raises: ValueError: The log transaction happened again in the same nanosecond time. ValueError: The box transaction happened again in the same nanosecond time.
1038 def log_exists(self, account: str, ref: int) -> bool: 1039 """ 1040 Checks if a specific transaction log entry exists for a given account. 1041 1042 Parameters: 1043 account (str): The account number associated with the transaction log. 1044 ref (int): The reference to the transaction log entry. 1045 1046 Returns: 1047 bool: True if the transaction log entry exists, False otherwise. 1048 """ 1049 return self.ref_exists(account, 'log', ref)
Checks if a specific transaction log entry exists for a given account.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry.
Returns: bool: True if the transaction log entry exists, False otherwise.
1095 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1096 debug: bool = False) -> dict: 1097 """ 1098 This method is used to record or retrieve exchange rates for a specific account. 1099 1100 Parameters: 1101 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1102 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1103 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1104 - description (str): A description of the exchange rate. 1105 1106 Returns: 1107 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1108 it returns a dictionary with default values for the rate and description. 1109 """ 1110 if debug: 1111 print('exchange', f'debug={debug}') 1112 if created is None: 1113 created = self.time() 1114 no_lock = self.nolock() 1115 self.lock() 1116 if rate is not None: 1117 if rate <= 0: 1118 return dict() 1119 if account not in self._vault['exchange']: 1120 self._vault['exchange'][account] = {} 1121 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1122 return {"time": created, "rate": 1, "description": None} 1123 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1124 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1125 if no_lock: 1126 self.free(self.lock()) 1127 if debug: 1128 print("exchange-created-1", 1129 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1130 1131 if account in self._vault['exchange']: 1132 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1133 if valid_rates: 1134 latest_rate = max(valid_rates, key=lambda x: x[0]) 1135 if debug: 1136 print("exchange-read-1", 1137 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1138 'latest_rate', latest_rate) 1139 result = latest_rate[1] 1140 result['time'] = latest_rate[0] 1141 return result # إرجاع قاموس يحتوي على المعدل والوصف 1142 if debug: 1143 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1144 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ
This method is used to record or retrieve exchange rates for a specific account.
Parameters:
- account (str): The account number for which the exchange rate is being recorded or retrieved.
- created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used.
- rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate.
- description (str): A description of the exchange rate.
Returns:
- dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, it returns a dictionary with default values for the rate and description.
1146 @staticmethod 1147 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1148 """ 1149 This function calculates the exchanged amount of a currency. 1150 1151 Args: 1152 x (float): The original amount of the currency. 1153 x_rate (float): The exchange rate of the original currency. 1154 y_rate (float): The exchange rate of the target currency. 1155 1156 Returns: 1157 float: The exchanged amount of the target currency. 1158 """ 1159 return (x * x_rate) / y_rate
This function calculates the exchanged amount of a currency.
Args: x (float): The original amount of the currency. x_rate (float): The exchange rate of the original currency. y_rate (float): The exchange rate of the target currency.
Returns: float: The exchanged amount of the target currency.
1161 def exchanges(self) -> dict: 1162 """ 1163 Retrieve the recorded exchange rates for all accounts. 1164 1165 Parameters: 1166 None 1167 1168 Returns: 1169 dict: A dictionary containing all recorded exchange rates. 1170 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1171 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1172 """ 1173 return self._vault['exchange'].copy()
Retrieve the recorded exchange rates for all accounts.
Parameters: None
Returns: dict: A dictionary containing all recorded exchange rates. The keys are account names or numbers, and the values are dictionaries containing the exchange rates. Each exchange rate dictionary has timestamps as keys and exchange rate details as values.
1175 def accounts(self) -> dict: 1176 """ 1177 Returns a dictionary containing account numbers as keys and their respective balances as values. 1178 1179 Parameters: 1180 None 1181 1182 Returns: 1183 dict: A dictionary where keys are account numbers and values are their respective balances. 1184 """ 1185 result = {} 1186 for i in self._vault['account']: 1187 result[i] = self._vault['account'][i]['balance'] 1188 return result
Returns a dictionary containing account numbers as keys and their respective balances as values.
Parameters: None
Returns: dict: A dictionary where keys are account numbers and values are their respective balances.
1190 def boxes(self, account) -> dict: 1191 """ 1192 Retrieve the boxes (transactions) associated with a specific account. 1193 1194 Parameters: 1195 account (str): The account number for which to retrieve the boxes. 1196 1197 Returns: 1198 dict: A dictionary containing the boxes associated with the given account. 1199 If the account does not exist, an empty dictionary is returned. 1200 """ 1201 if self.account_exists(account): 1202 return self._vault['account'][account]['box'] 1203 return {}
Retrieve the boxes (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the boxes.
Returns: dict: A dictionary containing the boxes associated with the given account. If the account does not exist, an empty dictionary is returned.
1205 def logs(self, account) -> dict: 1206 """ 1207 Retrieve the logs (transactions) associated with a specific account. 1208 1209 Parameters: 1210 account (str): The account number for which to retrieve the logs. 1211 1212 Returns: 1213 dict: A dictionary containing the logs associated with the given account. 1214 If the account does not exist, an empty dictionary is returned. 1215 """ 1216 if self.account_exists(account): 1217 return self._vault['account'][account]['log'] 1218 return {}
Retrieve the logs (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the logs.
Returns: dict: A dictionary containing the logs associated with the given account. If the account does not exist, an empty dictionary is returned.
1220 def add_file(self, account: str, ref: int, path: str) -> int: 1221 """ 1222 Adds a file reference to a specific transaction log entry in the vault. 1223 1224 Parameters: 1225 account (str): The account number associated with the transaction log. 1226 ref (int): The reference to the transaction log entry. 1227 path (str): The path of the file to be added. 1228 1229 Returns: 1230 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1231 """ 1232 if self.account_exists(account): 1233 if ref in self._vault['account'][account]['log']: 1234 file_ref = self.time() 1235 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1236 no_lock = self.nolock() 1237 self.lock() 1238 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1239 if no_lock: 1240 self.free(self.lock()) 1241 return file_ref 1242 return 0
Adds a file reference to a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. path (str): The path of the file to be added.
Returns: int: The reference of the added file. If the account or transaction log entry does not exist, returns 0.
1244 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1245 """ 1246 Removes a file reference from a specific transaction log entry in the vault. 1247 1248 Parameters: 1249 account (str): The account number associated with the transaction log. 1250 ref (int): The reference to the transaction log entry. 1251 file_ref (int): The reference of the file to be removed. 1252 1253 Returns: 1254 bool: True if the file reference is successfully removed, False otherwise. 1255 """ 1256 if self.account_exists(account): 1257 if ref in self._vault['account'][account]['log']: 1258 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1259 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1261 no_lock = self.nolock() 1262 self.lock() 1263 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1264 if no_lock: 1265 self.free(self.lock()) 1266 return True 1267 return False
Removes a file reference from a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. file_ref (int): The reference of the file to be removed.
Returns: bool: True if the file reference is successfully removed, False otherwise.
1269 def balance(self, account: str = 1, cached: bool = True) -> int: 1270 """ 1271 Calculate and return the balance of a specific account. 1272 1273 Parameters: 1274 account (str): The account number. Default is '1'. 1275 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1276 1277 Returns: 1278 int: The balance of the account. 1279 1280 Note: 1281 If cached is True, the function returns the cached balance. 1282 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1283 """ 1284 if cached: 1285 return self._vault['account'][account]['balance'] 1286 x = 0 1287 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1]
Calculate and return the balance of a specific account.
Parameters: account (str): The account number. Default is '1'. cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True.
Returns: int: The balance of the account.
Note: If cached is True, the function returns the cached balance. If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items.
1289 def hide(self, account, status: bool = None) -> bool: 1290 """ 1291 Check or set the hide status of a specific account. 1292 1293 Parameters: 1294 account (str): The account number. 1295 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1296 1297 Returns: 1298 bool: The current or updated hide status of the account. 1299 1300 Raises: 1301 None 1302 1303 Example: 1304 >>> tracker = ZakatTracker() 1305 >>> ref = tracker.track(51, 'desc', 'account1') 1306 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1307 False 1308 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1309 True 1310 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1311 True 1312 >>> tracker.hide('account1', False) 1313 False 1314 """ 1315 if self.account_exists(account): 1316 if status is None: 1317 return self._vault['account'][account]['hide'] 1318 self._vault['account'][account]['hide'] = status 1319 return status 1320 return False
Check or set the hide status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new hide status. If not provided, the function will return the current status.
Returns: bool: The current or updated hide status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.hide('account1') # Set the hide status of 'account1' to True
False
>>> tracker.hide('account1', True) # Set the hide status of 'account1' to True
True
>>> tracker.hide('account1') # Get the hide status of 'account1' by default
True
>>> tracker.hide('account1', False)
False
1322 def zakatable(self, account, status: bool = None) -> bool: 1323 """ 1324 Check or set the zakatable status of a specific account. 1325 1326 Parameters: 1327 account (str): The account number. 1328 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1329 1330 Returns: 1331 bool: The current or updated zakatable status of the account. 1332 1333 Raises: 1334 None 1335 1336 Example: 1337 >>> tracker = ZakatTracker() 1338 >>> ref = tracker.track(51, 'desc', 'account1') 1339 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1340 True 1341 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1342 True 1343 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1344 True 1345 >>> tracker.zakatable('account1', False) 1346 False 1347 """ 1348 if self.account_exists(account): 1349 if status is None: 1350 return self._vault['account'][account]['zakatable'] 1351 self._vault['account'][account]['zakatable'] = status 1352 return status 1353 return False
Check or set the zakatable status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new zakatable status. If not provided, the function will return the current status.
Returns: bool: The current or updated zakatable status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default
True
>>> tracker.zakatable('account1', False)
False
1355 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1356 debug: bool = False) \ 1357 -> tuple[ 1358 int, 1359 list[ 1360 tuple[int, int], 1361 ], 1362 ] | tuple: 1363 """ 1364 Subtracts a specified value from an account's balance. 1365 1366 Parameters: 1367 unscaled_value (float | int | Decimal): The amount to be subtracted. 1368 desc (str): A description for the transaction. Defaults to an empty string. 1369 account (str): The account from which the value will be subtracted. Defaults to '1'. 1370 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1371 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1372 1373 Returns: 1374 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1375 1376 If the amount to subtract is greater than the account's balance, 1377 the remaining amount will be transferred to a new transaction with a negative value. 1378 1379 Raises: 1380 ValueError: The box transaction happened again in the same nanosecond time. 1381 ValueError: The log transaction happened again in the same nanosecond time. 1382 """ 1383 if debug: 1384 print('sub', f'debug={debug}') 1385 if unscaled_value < 0: 1386 return tuple() 1387 if unscaled_value == 0: 1388 ref = self.track(unscaled_value, '', account) 1389 return ref, ref 1390 if created is None: 1391 created = self.time() 1392 no_lock = self.nolock() 1393 self.lock() 1394 self.track(0, '', account) 1395 value = self.scale(unscaled_value) 1396 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1397 ids = sorted(self._vault['account'][account]['box'].keys()) 1398 limit = len(ids) + 1 1399 target = value 1400 if debug: 1401 print('ids', ids) 1402 ages = [] 1403 for i in range(-1, -limit, -1): 1404 if target == 0: 1405 break 1406 j = ids[i] 1407 if debug: 1408 print('i', i, 'j', j) 1409 rest = self._vault['account'][account]['box'][j]['rest'] 1410 if rest >= target: 1411 self._vault['account'][account]['box'][j]['rest'] -= target 1412 self._step(Action.SUB, account, ref=j, value=target) 1413 ages.append((j, target)) 1414 target = 0 1415 break 1416 elif target > rest > 0: 1417 chunk = rest 1418 target -= chunk 1419 self._step(Action.SUB, account, ref=j, value=chunk) 1420 ages.append((j, chunk)) 1421 self._vault['account'][account]['box'][j]['rest'] = 0 1422 if target > 0: 1423 self.track( 1424 unscaled_value=self.unscale(-target), 1425 desc=desc, 1426 account=account, 1427 logging=False, 1428 created=created, 1429 ) 1430 ages.append((created, target)) 1431 if no_lock: 1432 self.free(self.lock()) 1433 return created, ages
Subtracts a specified value from an account's balance.
Parameters: unscaled_value (float | int | Decimal): The amount to be subtracted. desc (str): A description for the transaction. Defaults to an empty string. account (str): The account from which the value will be subtracted. Defaults to '1'. created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction.
If the amount to subtract is greater than the account's balance, the remaining amount will be transferred to a new transaction with a negative value.
Raises: ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1435 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1436 created: int = None, 1437 debug: bool = False) -> list[int]: 1438 """ 1439 Transfers a specified value from one account to another. 1440 1441 Parameters: 1442 unscaled_amount (float | int | Decimal): The amount to be transferred. 1443 from_account (str): The account from which the value will be transferred. 1444 to_account (str): The account to which the value will be transferred. 1445 desc (str, optional): A description for the transaction. Defaults to an empty string. 1446 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1447 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1448 1449 Returns: 1450 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1451 1452 Raises: 1453 ValueError: Transfer to the same account is forbidden. 1454 ValueError: The box transaction happened again in the same nanosecond time. 1455 ValueError: The log transaction happened again in the same nanosecond time. 1456 """ 1457 if debug: 1458 print('transfer', f'debug={debug}') 1459 if from_account == to_account: 1460 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1461 if unscaled_amount <= 0: 1462 return [] 1463 if created is None: 1464 created = self.time() 1465 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1466 times = [] 1467 source_exchange = self.exchange(from_account, created) 1468 target_exchange = self.exchange(to_account, created) 1469 1470 if debug: 1471 print('ages', ages) 1472 1473 for age, value in ages: 1474 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1475 if debug: 1476 print('target_amount', target_amount) 1477 # Perform the transfer 1478 if self.box_exists(to_account, age): 1479 if debug: 1480 print('box_exists', age) 1481 capital = self._vault['account'][to_account]['box'][age]['capital'] 1482 rest = self._vault['account'][to_account]['box'][age]['rest'] 1483 if debug: 1484 print( 1485 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1486 selected_age = age 1487 if rest + target_amount > capital: 1488 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1489 selected_age = ZakatTracker.time() 1490 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1491 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1492 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1493 created=None, ref=None, debug=debug) 1494 times.append((age, y)) 1495 continue 1496 if debug: 1497 print( 1498 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1499 y = self.track( 1500 unscaled_value=self.unscale(int(target_amount)), 1501 desc=desc, 1502 account=to_account, 1503 logging=True, 1504 created=age, 1505 debug=debug, 1506 ) 1507 times.append(y) 1508 return times
Transfers a specified value from one account to another.
Parameters: unscaled_amount (float | int | Decimal): The amount to be transferred. from_account (str): The account from which the value will be transferred. to_account (str): The account to which the value will be transferred. desc (str, optional): A description for the transaction. Defaults to an empty string. created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: list[int]: A list of timestamps corresponding to the transactions made during the transfer.
Raises: ValueError: Transfer to the same account is forbidden. ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1510 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1511 cycle: float = None) -> tuple: 1512 """ 1513 Check the eligibility for Zakat based on the given parameters. 1514 1515 Parameters: 1516 silver_gram_price (float): The price of a gram of silver. 1517 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1518 it will be calculated based on the silver_gram_price. 1519 debug (bool): Flag to enable debug mode. 1520 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1521 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1522 1523 Returns: 1524 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1525 and a dictionary containing the Zakat plan. 1526 """ 1527 if debug: 1528 print('check', f'debug={debug}') 1529 if now is None: 1530 now = self.time() 1531 if cycle is None: 1532 cycle = ZakatTracker.TimeCycle() 1533 if unscaled_nisab is None: 1534 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1535 nisab = self.scale(unscaled_nisab) 1536 plan = {} 1537 below_nisab = 0 1538 brief = [0, 0, 0] 1539 valid = False 1540 if debug: 1541 print('exchanges', self.exchanges()) 1542 for x in self._vault['account']: 1543 if not self.zakatable(x): 1544 continue 1545 _box = self._vault['account'][x]['box'] 1546 _log = self._vault['account'][x]['log'] 1547 limit = len(_box) + 1 1548 ids = sorted(self._vault['account'][x]['box'].keys()) 1549 for i in range(-1, -limit, -1): 1550 j = ids[i] 1551 rest = float(_box[j]['rest']) 1552 if rest <= 0: 1553 continue 1554 exchange = self.exchange(x, created=self.time()) 1555 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1556 brief[0] += rest 1557 index = limit + i - 1 1558 epoch = (now - j) / cycle 1559 if debug: 1560 print(f"Epoch: {epoch}", _box[j]) 1561 if _box[j]['last'] > 0: 1562 epoch = (now - _box[j]['last']) / cycle 1563 if debug: 1564 print(f"Epoch: {epoch}") 1565 epoch = floor(epoch) 1566 if debug: 1567 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1568 if epoch == 0: 1569 continue 1570 if debug: 1571 print("Epoch - PASSED") 1572 brief[1] += rest 1573 if rest >= nisab: 1574 total = 0 1575 for _ in range(epoch): 1576 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1577 if total > 0: 1578 if x not in plan: 1579 plan[x] = {} 1580 valid = True 1581 brief[2] += total 1582 plan[x][index] = { 1583 'total': total, 1584 'count': epoch, 1585 'box_time': j, 1586 'box_capital': _box[j]['capital'], 1587 'box_rest': _box[j]['rest'], 1588 'box_last': _box[j]['last'], 1589 'box_total': _box[j]['total'], 1590 'box_count': _box[j]['count'], 1591 'box_log': _log[j]['desc'], 1592 'exchange_rate': exchange['rate'], 1593 'exchange_time': exchange['time'], 1594 'exchange_desc': exchange['description'], 1595 } 1596 else: 1597 chunk = ZakatTracker.ZakatCut(float(rest)) 1598 if chunk > 0: 1599 if x not in plan: 1600 plan[x] = {} 1601 if j not in plan[x].keys(): 1602 plan[x][index] = {} 1603 below_nisab += rest 1604 brief[2] += chunk 1605 plan[x][index]['below_nisab'] = chunk 1606 plan[x][index]['total'] = chunk 1607 plan[x][index]['count'] = epoch 1608 plan[x][index]['box_time'] = j 1609 plan[x][index]['box_capital'] = _box[j]['capital'] 1610 plan[x][index]['box_rest'] = _box[j]['rest'] 1611 plan[x][index]['box_last'] = _box[j]['last'] 1612 plan[x][index]['box_total'] = _box[j]['total'] 1613 plan[x][index]['box_count'] = _box[j]['count'] 1614 plan[x][index]['box_log'] = _log[j]['desc'] 1615 plan[x][index]['exchange_rate'] = exchange['rate'] 1616 plan[x][index]['exchange_time'] = exchange['time'] 1617 plan[x][index]['exchange_desc'] = exchange['description'] 1618 valid = valid or below_nisab >= nisab 1619 if debug: 1620 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1621 return valid, brief, plan
Check the eligibility for Zakat based on the given parameters.
Parameters: silver_gram_price (float): The price of a gram of silver. unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, it will be calculated based on the silver_gram_price. debug (bool): Flag to enable debug mode. now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle().
Returns: tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, and a dictionary containing the Zakat plan.
1623 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1624 """ 1625 Build payment parts for the Zakat distribution. 1626 1627 Parameters: 1628 demand (float): The total demand for payment in local currency. 1629 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1630 1631 Returns: 1632 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1633 { 1634 'account': { 1635 'account_id': {'balance': float, 'rate': float, 'part': float}, 1636 ... 1637 }, 1638 'exceed': bool, 1639 'demand': float, 1640 'total': float, 1641 } 1642 """ 1643 total = 0 1644 parts = { 1645 'account': {}, 1646 'exceed': False, 1647 'demand': demand, 1648 } 1649 for x, y in self.accounts().items(): 1650 if positive_only and y <= 0: 1651 continue 1652 total += float(y) 1653 exchange = self.exchange(x) 1654 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1655 parts['total'] = total 1656 return parts
Build payment parts for the Zakat distribution.
Parameters: demand (float): The total demand for payment in local currency. positive_only (bool): If True, only consider accounts with positive balance. Default is True.
Returns: dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: { 'account': { 'account_id': {'balance': float, 'rate': float, 'part': float}, ... }, 'exceed': bool, 'demand': float, 'total': float, }
1658 @staticmethod 1659 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1660 """ 1661 Checks the validity of payment parts. 1662 1663 Parameters: 1664 parts (dict): A dictionary containing payment parts information. 1665 debug (bool): Flag to enable debug mode. 1666 1667 Returns: 1668 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1669 1670 Error Codes: 1671 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1672 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1673 3: 'part' value in parts['account'][x] is less than 0. 1674 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1675 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1676 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1677 """ 1678 if debug: 1679 print('check_payment_parts', f'debug={debug}') 1680 for i in ['demand', 'account', 'total', 'exceed']: 1681 if i not in parts: 1682 return 1 1683 exceed = parts['exceed'] 1684 for x in parts['account']: 1685 for j in ['balance', 'rate', 'part']: 1686 if j not in parts['account'][x]: 1687 return 2 1688 if parts['account'][x]['part'] < 0: 1689 return 3 1690 if not exceed and parts['account'][x]['balance'] <= 0: 1691 return 4 1692 demand = parts['demand'] 1693 z = 0 1694 for _, y in parts['account'].items(): 1695 if not exceed and y['part'] > y['balance']: 1696 return 5 1697 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1698 z = round(z, 2) 1699 demand = round(demand, 2) 1700 if debug: 1701 print('check_payment_parts', f'z = {z}, demand = {demand}') 1702 print('check_payment_parts', type(z), type(demand)) 1703 print('check_payment_parts', z != demand) 1704 print('check_payment_parts', str(z) != str(demand)) 1705 if z != demand and str(z) != str(demand): 1706 return 6 1707 return 0
Checks the validity of payment parts.
Parameters: parts (dict): A dictionary containing payment parts information. debug (bool): Flag to enable debug mode.
Returns: int: Returns 0 if the payment parts are valid, otherwise returns the error code.
Error Codes: 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 3: 'part' value in parts['account'][x] is less than 0. 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 6: The sum of 'part' values in parts['account'] does not match with 'demand' value.
1709 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1710 """ 1711 Perform Zakat calculation based on the given report and optional parts. 1712 1713 Parameters: 1714 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1715 parts (dict): A dictionary containing the payment parts for the zakat. 1716 debug (bool): A flag indicating whether to print debug information. 1717 1718 Returns: 1719 bool: True if the zakat calculation is successful, False otherwise. 1720 """ 1721 if debug: 1722 print('zakat', f'debug={debug}') 1723 valid, _, plan = report 1724 if not valid: 1725 return valid 1726 parts_exist = parts is not None 1727 if parts_exist: 1728 if self.check_payment_parts(parts, debug=debug) != 0: 1729 return False 1730 if debug: 1731 print('######### zakat #######') 1732 print('parts_exist', parts_exist) 1733 no_lock = self.nolock() 1734 self.lock() 1735 report_time = self.time() 1736 self._vault['report'][report_time] = report 1737 self._step(Action.REPORT, ref=report_time) 1738 created = self.time() 1739 for x in plan: 1740 target_exchange = self.exchange(x) 1741 if debug: 1742 print(plan[x]) 1743 print('-------------') 1744 print(self._vault['account'][x]['box']) 1745 ids = sorted(self._vault['account'][x]['box'].keys()) 1746 if debug: 1747 print('plan[x]', plan[x]) 1748 for i in plan[x].keys(): 1749 j = ids[i] 1750 if debug: 1751 print('i', i, 'j', j) 1752 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1753 key='last', 1754 math_operation=MathOperation.EQUAL) 1755 self._vault['account'][x]['box'][j]['last'] = created 1756 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1757 self._vault['account'][x]['box'][j]['total'] += amount 1758 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1759 math_operation=MathOperation.ADDITION) 1760 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1761 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1762 math_operation=MathOperation.ADDITION) 1763 if not parts_exist: 1764 try: 1765 self._vault['account'][x]['box'][j]['rest'] -= amount 1766 except TypeError: 1767 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1768 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1769 # math_operation=MathOperation.SUBTRACTION) 1770 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1771 if parts_exist: 1772 for account, part in parts['account'].items(): 1773 if part['part'] == 0: 1774 continue 1775 if debug: 1776 print('zakat-part', account, part['rate']) 1777 target_exchange = self.exchange(account) 1778 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1779 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1780 if no_lock: 1781 self.free(self.lock()) 1782 return True
Perform Zakat calculation based on the given report and optional parts.
Parameters: report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. parts (dict): A dictionary containing the payment parts for the zakat. debug (bool): A flag indicating whether to print debug information.
Returns: bool: True if the zakat calculation is successful, False otherwise.
1784 def export_json(self, path: str = "data.json") -> bool: 1785 """ 1786 Exports the current state of the ZakatTracker object to a JSON file. 1787 1788 Parameters: 1789 path (str): The path where the JSON file will be saved. Default is "data.json". 1790 1791 Returns: 1792 bool: True if the export is successful, False otherwise. 1793 1794 Raises: 1795 No specific exceptions are raised by this method. 1796 """ 1797 with open(path, "w") as file: 1798 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1799 return True
Exports the current state of the ZakatTracker object to a JSON file.
Parameters: path (str): The path where the JSON file will be saved. Default is "data.json".
Returns: bool: True if the export is successful, False otherwise.
Raises: No specific exceptions are raised by this method.
1801 def save(self, path: str = None) -> bool: 1802 """ 1803 Saves the ZakatTracker's current state to a camel file. 1804 1805 This method serializes the internal data (`_vault`). 1806 1807 Parameters: 1808 path (str, optional): File path for saving. Defaults to a predefined location. 1809 1810 Returns: 1811 bool: True if the save operation is successful, False otherwise. 1812 """ 1813 if path is None: 1814 path = self.path() 1815 with open(f'{path}.tmp', 'w') as stream: 1816 # first save in tmp file 1817 stream.write(camel.dump(self._vault)) 1818 # then move tmp file to original location 1819 shutil.move(f'{path}.tmp', path) 1820 return True
Saves the ZakatTracker's current state to a camel file.
This method serializes the internal data (_vault
).
Parameters: path (str, optional): File path for saving. Defaults to a predefined location.
Returns: bool: True if the save operation is successful, False otherwise.
1822 def load(self, path: str = None) -> bool: 1823 """ 1824 Load the current state of the ZakatTracker object from a camel file. 1825 1826 Parameters: 1827 path (str): The path where the camel file is located. If not provided, it will use the default path. 1828 1829 Returns: 1830 bool: True if the load operation is successful, False otherwise. 1831 """ 1832 if path is None: 1833 path = self.path() 1834 if os.path.exists(path): 1835 with open(path, 'r') as stream: 1836 self._vault = camel.load(stream.read()) 1837 return True 1838 return False
Load the current state of the ZakatTracker object from a camel file.
Parameters: path (str): The path where the camel file is located. If not provided, it will use the default path.
Returns: bool: True if the load operation is successful, False otherwise.
1840 def import_csv_cache_path(self): 1841 """ 1842 Generates the cache file path for imported CSV data. 1843 1844 This function constructs the file path where cached data from CSV imports 1845 will be stored. The cache file is a camel file (.camel extension) appended 1846 to the base path of the object. 1847 1848 Returns: 1849 str: The full path to the import CSV cache file. 1850 1851 Example: 1852 >>> obj = ZakatTracker('/data/reports') 1853 >>> obj.import_csv_cache_path() 1854 '/data/reports.import_csv.camel' 1855 """ 1856 path = str(self.path()) 1857 ext = self.ext() 1858 ext_len = len(ext) 1859 if path.endswith(f'.{ext}'): 1860 path = path[:-ext_len-1] 1861 return path + f'.import_csv.{ext}'
Generates the cache file path for imported CSV data.
This function constructs the file path where cached data from CSV imports will be stored. The cache file is a camel file (.camel extension) appended to the base path of the object.
Returns: str: The full path to the import CSV cache file.
Example:
obj = ZakatTracker('/data/reports') obj.import_csv_cache_path() '/data/reports.import_csv.camel'
1863 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1864 """ 1865 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1866 1867 Parameters: 1868 path (str): The path to the CSV file. Default is 'file.csv'. 1869 debug (bool): A flag indicating whether to print debug information. 1870 1871 Returns: 1872 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1873 and a dictionary of bad transactions. 1874 1875 Notes: 1876 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1877 are appropriate for the currency pairs involved in the conversions. 1878 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1879 to 1.0 or the previous rate for that account. 1880 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1881 transactions of the same account within the whole imported and existing dataset when doing `check` and 1882 `zakat` operations. 1883 1884 Example Usage: 1885 The CSV file should have the following format, rate is optional per transaction: 1886 account, desc, value, date, rate 1887 For example: 1888 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1889 """ 1890 if debug: 1891 print('import_csv', f'debug={debug}') 1892 cache: list[int] = [] 1893 try: 1894 with open(self.import_csv_cache_path(), 'r') as stream: 1895 cache = camel.load(stream.read()) 1896 except: 1897 pass 1898 date_formats = [ 1899 "%Y-%m-%d %H:%M:%S", 1900 "%Y-%m-%dT%H:%M:%S", 1901 "%Y-%m-%dT%H%M%S", 1902 "%Y-%m-%d", 1903 ] 1904 created, found, bad = 0, 0, {} 1905 data: dict[int, list] = {} 1906 with open(path, newline='', encoding="utf-8") as f: 1907 i = 0 1908 for row in csv.reader(f, delimiter=','): 1909 i += 1 1910 hashed = hash(tuple(row)) 1911 if hashed in cache: 1912 found += 1 1913 continue 1914 account = row[0] 1915 desc = row[1] 1916 value = float(row[2]) 1917 rate = 1.0 1918 if row[4:5]: # Empty list if index is out of range 1919 rate = float(row[4]) 1920 date: int = 0 1921 for time_format in date_formats: 1922 try: 1923 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1924 break 1925 except: 1926 pass 1927 # TODO: not allowed for negative dates in the future after enhance time functions 1928 if date == 0 or value == 0: 1929 bad[i] = row + ('invalid date',) 1930 continue 1931 if date not in data: 1932 data[date] = [] 1933 data[date].append((i, account, desc, value, date, rate, hashed)) 1934 1935 if debug: 1936 print('import_csv', len(data)) 1937 1938 if bad: 1939 return created, found, bad 1940 1941 for date, rows in sorted(data.items()): 1942 try: 1943 len_rows = len(rows) 1944 if len_rows == 1: 1945 (_, account, desc, value, date, rate, hashed) = rows[0] 1946 if rate > 0: 1947 self.exchange(account, created=date, rate=rate) 1948 if value > 0: 1949 self.track(value, desc, account, True, date) 1950 elif value < 0: 1951 self.sub(-value, desc, account, date) 1952 created += 1 1953 cache.append(hashed) 1954 continue 1955 if debug: 1956 print('-- Duplicated time detected', date, 'len', len_rows) 1957 print(rows) 1958 print('---------------------------------') 1959 # If records are found at the same time with different accounts in the same amount 1960 # (one positive and the other negative), this indicates it is a transfer. 1961 if len_rows != 2: 1962 raise Exception(f'more than two transactions({len_rows}) at the same time') 1963 (i, account1, desc1, value1, date1, rate1, _) = rows[0] 1964 (j, account2, desc2, value2, date2, rate2, _) = rows[1] 1965 if account1 == account2 or desc1 != desc2 or abs(value1) != abs(value2) or date1 != date2: 1966 raise Exception('invalid transfer') 1967 if rate1 > 0: 1968 self.exchange(account1, created=date1, rate=rate1) 1969 if rate2 > 0: 1970 self.exchange(account2, created=date2, rate=rate2) 1971 values = { 1972 value1: account1, 1973 value2: account2, 1974 } 1975 self.transfer( 1976 unscaled_amount=abs(value1), 1977 from_account=values[min(values.keys())], 1978 to_account=values[max(values.keys())], 1979 desc=desc1, 1980 created=date1, 1981 ) 1982 except Exception as e: 1983 for (i, account, desc, value, date, rate, _) in rows: 1984 bad[i] = (account, desc, value, date, rate, e) 1985 break 1986 with open(self.import_csv_cache_path(), 'w') as stream: 1987 stream.write(camel.dump(cache)) 1988 return created, found, bad
The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system.
Parameters: path (str): The path to the CSV file. Default is 'file.csv'. debug (bool): A flag indicating whether to print debug information.
Returns: tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, and a dictionary of bad transactions.
Notes:
* Currency Pair Assumption: This function assumes that the exchange rates stored for each account
are appropriate for the currency pairs involved in the conversions.
* The exchange rate for each account is based on the last encountered transaction rate that is not equal
to 1.0 or the previous rate for that account.
* Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent
transactions of the same account within the whole imported and existing dataset when doing check
and
zakat
operations.
Example Usage: The CSV file should have the following format, rate is optional per transaction: account, desc, value, date, rate For example: safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1
1994 @staticmethod 1995 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1996 """ 1997 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 1998 1999 This function iterates through progressively larger units of information 2000 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2001 range that can be expressed with a reasonable number before the unit. 2002 2003 Parameters: 2004 size (float): The size in bytes to convert. 2005 decimal_places (int, optional): The number of decimal places to display 2006 in the result. Defaults to 2. 2007 2008 Returns: 2009 str: A string representation of the size in a human-readable format, 2010 rounded to the specified number of decimal places. For example: 2011 - "1.50 KB" (1536 bytes) 2012 - "23.00 MB" (24117248 bytes) 2013 - "1.23 GB" (1325899906 bytes) 2014 """ 2015 if type(size) not in (float, int): 2016 raise TypeError("size must be a float or integer") 2017 if type(decimal_places) != int: 2018 raise TypeError("decimal_places must be an integer") 2019 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2020 if size < 1024.0: 2021 break 2022 size /= 1024.0 2023 return f"{size:.{decimal_places}f} {unit}"
Converts a size in bytes to a human-readable format (e.g., KB, MB, GB).
This function iterates through progressively larger units of information (B, KB, MB, GB, etc.) and divides the input size until it fits within a range that can be expressed with a reasonable number before the unit.
Parameters: size (float): The size in bytes to convert. decimal_places (int, optional): The number of decimal places to display in the result. Defaults to 2.
Returns: str: A string representation of the size in a human-readable format, rounded to the specified number of decimal places. For example: - "1.50 KB" (1536 bytes) - "23.00 MB" (24117248 bytes) - "1.23 GB" (1325899906 bytes)
2025 @staticmethod 2026 def get_dict_size(obj: dict, seen: set = None) -> float: 2027 """ 2028 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2029 2030 This function traverses the dictionary structure, accounting for the size of keys, values, 2031 and any nested objects. It handles various data types commonly found in dictionaries 2032 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2033 of circular references. 2034 2035 Parameters: 2036 obj (dict): The dictionary whose size is to be calculated. 2037 seen (set, optional): A set used internally to track visited objects 2038 and avoid circular references. Defaults to None. 2039 2040 Returns: 2041 float: An approximate size of the dictionary and its contents in bytes. 2042 2043 Note: 2044 - This function is a method of the `ZakatTracker` class and is likely used to 2045 estimate the memory footprint of data structures relevant to Zakat calculations. 2046 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2047 not account for all memory overhead depending on the Python implementation. 2048 - Circular references are handled to prevent infinite recursion. 2049 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2050 - String sizes are estimated based on character length and encoding. 2051 """ 2052 size = 0 2053 if seen is None: 2054 seen = set() 2055 2056 obj_id = id(obj) 2057 if obj_id in seen: 2058 return 0 2059 2060 seen.add(obj_id) 2061 size += sys.getsizeof(obj) 2062 2063 if isinstance(obj, dict): 2064 for k, v in obj.items(): 2065 size += ZakatTracker.get_dict_size(k, seen) 2066 size += ZakatTracker.get_dict_size(v, seen) 2067 elif isinstance(obj, (list, tuple, set, frozenset)): 2068 for item in obj: 2069 size += ZakatTracker.get_dict_size(item, seen) 2070 elif isinstance(obj, (int, float, complex)): # Handle numbers 2071 pass # Basic numbers have a fixed size, so nothing to add here 2072 elif isinstance(obj, str): # Handle strings 2073 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2074 return size
Recursively calculates the approximate memory size of a dictionary and its contents in bytes.
This function traverses the dictionary structure, accounting for the size of keys, values, and any nested objects. It handles various data types commonly found in dictionaries (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case of circular references.
Parameters: obj (dict): The dictionary whose size is to be calculated. seen (set, optional): A set used internally to track visited objects and avoid circular references. Defaults to None.
Returns: float: An approximate size of the dictionary and its contents in bytes.
Note:
- This function is a method of the
ZakatTracker
class and is likely used to estimate the memory footprint of data structures relevant to Zakat calculations. - The size calculation is approximate as it relies on
sys.getsizeof()
, which might not account for all memory overhead depending on the Python implementation. - Circular references are handled to prevent infinite recursion.
- Basic numeric types (int, float, complex) are assumed to have fixed sizes.
- String sizes are estimated based on character length and encoding.
2076 @staticmethod 2077 def duration_from_nanoseconds(ns: int, 2078 show_zeros_in_spoken_time: bool = False, 2079 spoken_time_separator=',', 2080 millennia: str = 'Millennia', 2081 century: str = 'Century', 2082 years: str = 'Years', 2083 days: str = 'Days', 2084 hours: str = 'Hours', 2085 minutes: str = 'Minutes', 2086 seconds: str = 'Seconds', 2087 milli_seconds: str = 'MilliSeconds', 2088 micro_seconds: str = 'MicroSeconds', 2089 nano_seconds: str = 'NanoSeconds', 2090 ) -> tuple: 2091 """ 2092 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2093 Convert NanoSeconds to Human Readable Time Format. 2094 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2095 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2096 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2097 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2098 2099 INPUT : ms (AKA: MilliSeconds) 2100 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2101 OUTPUT Variables: time_lapsed, spoken_time 2102 2103 Example Input: duration_from_nanoseconds(ns) 2104 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2105 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2106 duration_from_nanoseconds(1234567890123456789012) 2107 """ 2108 us, ns = divmod(ns, 1000) 2109 ms, us = divmod(us, 1000) 2110 s, ms = divmod(ms, 1000) 2111 m, s = divmod(s, 60) 2112 h, m = divmod(m, 60) 2113 d, h = divmod(h, 24) 2114 y, d = divmod(d, 365) 2115 c, y = divmod(y, 100) 2116 n, c = divmod(c, 10) 2117 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2118 spoken_time_part = [] 2119 if n > 0 or show_zeros_in_spoken_time: 2120 spoken_time_part.append(f"{n: 3d} {millennia}") 2121 if c > 0 or show_zeros_in_spoken_time: 2122 spoken_time_part.append(f"{c: 4d} {century}") 2123 if y > 0 or show_zeros_in_spoken_time: 2124 spoken_time_part.append(f"{y: 3d} {years}") 2125 if d > 0 or show_zeros_in_spoken_time: 2126 spoken_time_part.append(f"{d: 4d} {days}") 2127 if h > 0 or show_zeros_in_spoken_time: 2128 spoken_time_part.append(f"{h: 2d} {hours}") 2129 if m > 0 or show_zeros_in_spoken_time: 2130 spoken_time_part.append(f"{m: 2d} {minutes}") 2131 if s > 0 or show_zeros_in_spoken_time: 2132 spoken_time_part.append(f"{s: 2d} {seconds}") 2133 if ms > 0 or show_zeros_in_spoken_time: 2134 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2135 if us > 0 or show_zeros_in_spoken_time: 2136 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2137 if ns > 0 or show_zeros_in_spoken_time: 2138 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2139 return time_lapsed, spoken_time_separator.join(spoken_time_part)
REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 Convert NanoSeconds to Human Readable Time Format. A NanoSeconds is a unit of time in the International System of Units (SI) equal to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. Its symbol is μs, sometimes simplified to us when Unicode is not available. A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond.
INPUT : ms (AKA: MilliSeconds) OUTPUT: tuple(string time_lapsed, string spoken_time) like format. OUTPUT Variables: time_lapsed, spoken_time
Example Input: duration_from_nanoseconds(ns) "Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds" Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') duration_from_nanoseconds(1234567890123456789012)
2141 @staticmethod 2142 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2143 """ 2144 Convert a specific day, month, and year into a timestamp. 2145 2146 Parameters: 2147 day (int): The day of the month. 2148 month (int): The month of the year. Default is 6 (June). 2149 year (int): The year. Default is 2024. 2150 2151 Returns: 2152 int: The timestamp representing the given day, month, and year. 2153 2154 Note: 2155 This method assumes the default month and year if not provided. 2156 """ 2157 return ZakatTracker.time(datetime.datetime(year, month, day))
Convert a specific day, month, and year into a timestamp.
Parameters: day (int): The day of the month. month (int): The month of the year. Default is 6 (June). year (int): The year. Default is 2024.
Returns: int: The timestamp representing the given day, month, and year.
Note: This method assumes the default month and year if not provided.
2159 @staticmethod 2160 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2161 """ 2162 Generate a random date between two given dates. 2163 2164 Parameters: 2165 start_date (datetime.datetime): The start date from which to generate a random date. 2166 end_date (datetime.datetime): The end date until which to generate a random date. 2167 2168 Returns: 2169 datetime.datetime: A random date between the start_date and end_date. 2170 """ 2171 time_between_dates = end_date - start_date 2172 days_between_dates = time_between_dates.days 2173 random_number_of_days = random.randrange(days_between_dates) 2174 return start_date + datetime.timedelta(days=random_number_of_days)
Generate a random date between two given dates.
Parameters: start_date (datetime.datetime): The start date from which to generate a random date. end_date (datetime.datetime): The end date until which to generate a random date.
Returns: datetime.datetime: A random date between the start_date and end_date.
2176 @staticmethod 2177 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2178 debug: bool = False) -> int: 2179 """ 2180 Generate a random CSV file with specified parameters. 2181 2182 Parameters: 2183 path (str): The path where the CSV file will be saved. Default is "data.csv". 2184 count (int): The number of rows to generate in the CSV file. Default is 1000. 2185 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2186 debug (bool): A flag indicating whether to print debug information. 2187 2188 Returns: 2189 None. The function generates a CSV file at the specified path with the given count of rows. 2190 Each row contains a randomly generated account, description, value, and date. 2191 The value is randomly generated between 1000 and 100000, 2192 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2193 If the row number is not divisible by 13, the value is multiplied by -1. 2194 """ 2195 if debug: 2196 print('generate_random_csv_file', f'debug={debug}') 2197 i = 0 2198 with open(path, "w", newline="") as csvfile: 2199 writer = csv.writer(csvfile) 2200 for i in range(count): 2201 account = f"acc-{random.randint(1, 1000)}" 2202 desc = f"Some text {random.randint(1, 1000)}" 2203 value = random.randint(1000, 100000) 2204 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2205 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2206 if not i % 13 == 0: 2207 value *= -1 2208 row = [account, desc, value, date] 2209 if with_rate: 2210 rate = random.randint(1, 100) * 0.12 2211 if debug: 2212 print('before-append', row) 2213 row.append(rate) 2214 if debug: 2215 print('after-append', row) 2216 writer.writerow(row) 2217 i = i + 1 2218 return i
Generate a random CSV file with specified parameters.
Parameters: path (str): The path where the CSV file will be saved. Default is "data.csv". count (int): The number of rows to generate in the CSV file. Default is 1000. with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. debug (bool): A flag indicating whether to print debug information.
Returns: None. The function generates a CSV file at the specified path with the given count of rows. Each row contains a randomly generated account, description, value, and date. The value is randomly generated between 1000 and 100000, and the date is randomly generated between 1950-01-01 and 2023-12-31. If the row number is not divisible by 13, the value is multiplied by -1.
2220 @staticmethod 2221 def create_random_list(max_sum, min_value=0, max_value=10): 2222 """ 2223 Creates a list of random integers whose sum does not exceed the specified maximum. 2224 2225 Args: 2226 max_sum: The maximum allowed sum of the list elements. 2227 min_value: The minimum possible value for an element (inclusive). 2228 max_value: The maximum possible value for an element (inclusive). 2229 2230 Returns: 2231 A list of random integers. 2232 """ 2233 result = [] 2234 current_sum = 0 2235 2236 while current_sum < max_sum: 2237 # Calculate the remaining space for the next element 2238 remaining_sum = max_sum - current_sum 2239 # Determine the maximum possible value for the next element 2240 next_max_value = min(remaining_sum, max_value) 2241 # Generate a random element within the allowed range 2242 next_element = random.randint(min_value, next_max_value) 2243 result.append(next_element) 2244 current_sum += next_element 2245 2246 return result
Creates a list of random integers whose sum does not exceed the specified maximum.
Args: max_sum: The maximum allowed sum of the list elements. min_value: The minimum possible value for an element (inclusive). max_value: The maximum possible value for an element (inclusive).
Returns: A list of random integers.
2475 def test(self, debug: bool = False) -> bool: 2476 if debug: 2477 print('test', f'debug={debug}') 2478 try: 2479 2480 self._test_core(True, debug) 2481 self._test_core(False, debug) 2482 2483 assert self._history() 2484 2485 # Not allowed for duplicate transactions in the same account and time 2486 2487 created = ZakatTracker.time() 2488 self.track(100, 'test-1', 'same', True, created) 2489 failed = False 2490 try: 2491 self.track(50, 'test-1', 'same', True, created) 2492 except: 2493 failed = True 2494 assert failed is True 2495 2496 self.reset() 2497 2498 # Same account transfer 2499 for x in [1, 'a', True, 1.8, None]: 2500 failed = False 2501 try: 2502 self.transfer(1, x, x, 'same-account', debug=debug) 2503 except: 2504 failed = True 2505 assert failed is True 2506 2507 # Always preserve box age during transfer 2508 2509 series: list[tuple] = [ 2510 (30, 4), 2511 (60, 3), 2512 (90, 2), 2513 ] 2514 case = { 2515 3000: { 2516 'series': series, 2517 'rest': 15000, 2518 }, 2519 6000: { 2520 'series': series, 2521 'rest': 12000, 2522 }, 2523 9000: { 2524 'series': series, 2525 'rest': 9000, 2526 }, 2527 18000: { 2528 'series': series, 2529 'rest': 0, 2530 }, 2531 27000: { 2532 'series': series, 2533 'rest': -9000, 2534 }, 2535 36000: { 2536 'series': series, 2537 'rest': -18000, 2538 }, 2539 } 2540 2541 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2542 2543 for total in case: 2544 if debug: 2545 print('--------------------------------------------------------') 2546 print(f'case[{total}]', case[total]) 2547 for x in case[total]['series']: 2548 self.track( 2549 unscaled_value=x[0], 2550 desc=f"test-{x} ages", 2551 account='ages', 2552 logging=True, 2553 created=selected_time * x[1], 2554 ) 2555 2556 unscaled_total = self.unscale(total) 2557 if debug: 2558 print('unscaled_total', unscaled_total) 2559 refs = self.transfer( 2560 unscaled_amount=unscaled_total, 2561 from_account='ages', 2562 to_account='future', 2563 desc='Zakat Movement', 2564 debug=debug, 2565 ) 2566 2567 if debug: 2568 print('refs', refs) 2569 2570 ages_cache_balance = self.balance('ages') 2571 ages_fresh_balance = self.balance('ages', False) 2572 rest = case[total]['rest'] 2573 if debug: 2574 print('source', ages_cache_balance, ages_fresh_balance, rest) 2575 assert ages_cache_balance == rest 2576 assert ages_fresh_balance == rest 2577 2578 future_cache_balance = self.balance('future') 2579 future_fresh_balance = self.balance('future', False) 2580 if debug: 2581 print('target', future_cache_balance, future_fresh_balance, total) 2582 print('refs', refs) 2583 assert future_cache_balance == total 2584 assert future_fresh_balance == total 2585 2586 # TODO: check boxes times for `ages` should equal box times in `future` 2587 for ref in self._vault['account']['ages']['box']: 2588 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2589 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2590 future_capital = 0 2591 if ref in self._vault['account']['future']['box']: 2592 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2593 future_rest = 0 2594 if ref in self._vault['account']['future']['box']: 2595 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2596 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2597 if debug: 2598 print('================================================================') 2599 print('ages', ages_capital, ages_rest) 2600 print('future', future_capital, future_rest) 2601 if ages_rest == 0: 2602 assert ages_capital == future_capital 2603 elif ages_rest < 0: 2604 assert -ages_capital == future_capital 2605 elif ages_rest > 0: 2606 assert ages_capital == ages_rest + future_capital 2607 self.reset() 2608 assert len(self._vault['history']) == 0 2609 2610 assert self._history() 2611 assert self._history(False) is False 2612 assert self._history() is False 2613 assert self._history(True) 2614 assert self._history() 2615 if debug: 2616 print('####################################################################') 2617 2618 transaction = [ 2619 ( 2620 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2621 2000, 2000, 2000, 1, 1, 2622 ), 2623 ( 2624 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2625 75000, 75000, 75000, 1, 1, 2626 ), 2627 ( 2628 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2629 60000, 60000, 60000, 1, 1, 2630 ), 2631 ] 2632 for z in transaction: 2633 self.lock() 2634 x = z[1] 2635 y = z[2] 2636 self.transfer( 2637 unscaled_amount=z[0], 2638 from_account=x, 2639 to_account=y, 2640 desc='test-transfer', 2641 debug=debug, 2642 ) 2643 zz = self.balance(x) 2644 if debug: 2645 print(zz, z) 2646 assert zz == z[3] 2647 xx = self.accounts()[x] 2648 assert xx == z[3] 2649 assert self.balance(x, False) == z[4] 2650 assert xx == z[4] 2651 2652 s = 0 2653 log = self._vault['account'][x]['log'] 2654 for i in log: 2655 s += log[i]['value'] 2656 if debug: 2657 print('s', s, 'z[5]', z[5]) 2658 assert s == z[5] 2659 2660 assert self.box_size(x) == z[6] 2661 assert self.log_size(x) == z[7] 2662 2663 yy = self.accounts()[y] 2664 assert self.balance(y) == z[8] 2665 assert yy == z[8] 2666 assert self.balance(y, False) == z[9] 2667 assert yy == z[9] 2668 2669 s = 0 2670 log = self._vault['account'][y]['log'] 2671 for i in log: 2672 s += log[i]['value'] 2673 assert s == z[10] 2674 2675 assert self.box_size(y) == z[11] 2676 assert self.log_size(y) == z[12] 2677 assert self.free(self.lock()) 2678 2679 if debug: 2680 pp().pprint(self.check(2.17)) 2681 2682 assert not self.nolock() 2683 history_count = len(self._vault['history']) 2684 if debug: 2685 print('history-count', history_count) 2686 assert history_count == 4 2687 assert not self.free(ZakatTracker.time()) 2688 assert self.free(self.lock()) 2689 assert self.nolock() 2690 assert len(self._vault['history']) == 3 2691 2692 # storage 2693 2694 _path = self.path(f'test.{self.ext()}') 2695 if os.path.exists(_path): 2696 os.remove(_path) 2697 self.save() 2698 assert os.path.getsize(_path) > 0 2699 self.reset() 2700 assert self.recall(False, debug) is False 2701 self.load() 2702 assert self._vault['account'] is not None 2703 2704 # recall 2705 2706 assert self.nolock() 2707 assert len(self._vault['history']) == 3 2708 assert self.recall(False, debug) is True 2709 assert len(self._vault['history']) == 2 2710 assert self.recall(False, debug) is True 2711 assert len(self._vault['history']) == 1 2712 assert self.recall(False, debug) is True 2713 assert len(self._vault['history']) == 0 2714 assert self.recall(False, debug) is False 2715 assert len(self._vault['history']) == 0 2716 2717 # exchange 2718 2719 self.exchange("cash", 25, 3.75, "2024-06-25") 2720 self.exchange("cash", 22, 3.73, "2024-06-22") 2721 self.exchange("cash", 15, 3.69, "2024-06-15") 2722 self.exchange("cash", 10, 3.66) 2723 2724 for i in range(1, 30): 2725 exchange = self.exchange("cash", i) 2726 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2727 if debug: 2728 print(i, rate, description, created) 2729 assert created 2730 if i < 10: 2731 assert rate == 1 2732 assert description is None 2733 elif i == 10: 2734 assert rate == 3.66 2735 assert description is None 2736 elif i < 15: 2737 assert rate == 3.66 2738 assert description is None 2739 elif i == 15: 2740 assert rate == 3.69 2741 assert description is not None 2742 elif i < 22: 2743 assert rate == 3.69 2744 assert description is not None 2745 elif i == 22: 2746 assert rate == 3.73 2747 assert description is not None 2748 elif i >= 25: 2749 assert rate == 3.75 2750 assert description is not None 2751 exchange = self.exchange("bank", i) 2752 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2753 if debug: 2754 print(i, rate, description, created) 2755 assert created 2756 assert rate == 1 2757 assert description is None 2758 2759 assert len(self._vault['exchange']) > 0 2760 assert len(self.exchanges()) > 0 2761 self._vault['exchange'].clear() 2762 assert len(self._vault['exchange']) == 0 2763 assert len(self.exchanges()) == 0 2764 2765 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2766 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2767 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2768 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2769 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2770 2771 for i in [x * 0.12 for x in range(-15, 21)]: 2772 if i <= 0: 2773 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2774 else: 2775 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2776 2777 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2778 for i in range(1, 31): 2779 timestamp_ns = ZakatTracker.day_to_time(i) 2780 exchange = self.exchange("cash", timestamp_ns) 2781 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2782 if debug: 2783 print(i, rate, description, created) 2784 assert created 2785 if i < 10: 2786 assert rate == 1 2787 assert description is None 2788 elif i == 10: 2789 assert rate == 3.66 2790 assert description is None 2791 elif i < 15: 2792 assert rate == 3.66 2793 assert description is None 2794 elif i == 15: 2795 assert rate == 3.69 2796 assert description is not None 2797 elif i < 22: 2798 assert rate == 3.69 2799 assert description is not None 2800 elif i == 22: 2801 assert rate == 3.73 2802 assert description is not None 2803 elif i >= 25: 2804 assert rate == 3.75 2805 assert description is not None 2806 exchange = self.exchange("bank", i) 2807 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2808 if debug: 2809 print(i, rate, description, created) 2810 assert created 2811 assert rate == 1 2812 assert description is None 2813 2814 # csv 2815 2816 csv_count = 1000 2817 2818 for with_rate, path in { 2819 False: 'test-import_csv-no-exchange', 2820 True: 'test-import_csv-with-exchange', 2821 }.items(): 2822 2823 if debug: 2824 print('test_import_csv', with_rate, path) 2825 2826 csv_path = path + '.csv' 2827 if os.path.exists(csv_path): 2828 os.remove(csv_path) 2829 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2830 if debug: 2831 print('generate_random_csv_file', c) 2832 assert c == csv_count 2833 assert os.path.getsize(csv_path) > 0 2834 cache_path = self.import_csv_cache_path() 2835 if os.path.exists(cache_path): 2836 os.remove(cache_path) 2837 self.reset() 2838 (created, found, bad) = self.import_csv(csv_path, debug) 2839 bad_count = len(bad) 2840 assert bad_count > 0 2841 if debug: 2842 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2843 print('bad', bad) 2844 tmp_size = os.path.getsize(cache_path) 2845 assert tmp_size > 0 2846 # TODO: assert created + found + bad_count == csv_count 2847 # TODO: assert created == csv_count 2848 # TODO: assert bad_count == 0 2849 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2850 bad_2_count = len(bad_2) 2851 if debug: 2852 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2853 print('bad', bad) 2854 assert bad_2_count > 0 2855 # TODO: assert tmp_size == os.path.getsize(cache_path) 2856 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2857 # TODO: assert created == found_2 2858 # TODO: assert bad_count == bad_2_count 2859 # TODO: assert found_2 == csv_count 2860 # TODO: assert bad_2_count == 0 2861 # TODO: assert created_2 == 0 2862 2863 # payment parts 2864 2865 positive_parts = self.build_payment_parts(100, positive_only=True) 2866 assert self.check_payment_parts(positive_parts) != 0 2867 assert self.check_payment_parts(positive_parts) != 0 2868 all_parts = self.build_payment_parts(300, positive_only=False) 2869 assert self.check_payment_parts(all_parts) != 0 2870 assert self.check_payment_parts(all_parts) != 0 2871 if debug: 2872 pp().pprint(positive_parts) 2873 pp().pprint(all_parts) 2874 # dynamic discount 2875 suite = [] 2876 count = 3 2877 for exceed in [False, True]: 2878 case = [] 2879 for parts in [positive_parts, all_parts]: 2880 part = parts.copy() 2881 demand = part['demand'] 2882 if debug: 2883 print(demand, part['total']) 2884 i = 0 2885 z = demand / count 2886 cp = { 2887 'account': {}, 2888 'demand': demand, 2889 'exceed': exceed, 2890 'total': part['total'], 2891 } 2892 j = '' 2893 for x, y in part['account'].items(): 2894 x_exchange = self.exchange(x) 2895 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2896 if exceed and zz <= demand: 2897 i += 1 2898 y['part'] = zz 2899 if debug: 2900 print(exceed, y) 2901 cp['account'][x] = y 2902 case.append(y) 2903 elif not exceed and y['balance'] >= zz: 2904 i += 1 2905 y['part'] = zz 2906 if debug: 2907 print(exceed, y) 2908 cp['account'][x] = y 2909 case.append(y) 2910 j = x 2911 if i >= count: 2912 break 2913 if len(cp['account'][j]) > 0: 2914 suite.append(cp) 2915 if debug: 2916 print('suite', len(suite)) 2917 # vault = self._vault.copy() 2918 for case in suite: 2919 # self._vault = vault.copy() 2920 if debug: 2921 print('case', case) 2922 result = self.check_payment_parts(case) 2923 if debug: 2924 print('check_payment_parts', result, f'exceed: {exceed}') 2925 assert result == 0 2926 2927 report = self.check(2.17, None, debug) 2928 (valid, brief, plan) = report 2929 if debug: 2930 print('valid', valid) 2931 zakat_result = self.zakat(report, parts=case, debug=debug) 2932 if debug: 2933 print('zakat-result', zakat_result) 2934 assert valid == zakat_result 2935 2936 assert self.save(path + f'.{self.ext()}') 2937 assert self.export_json(path + '.json') 2938 2939 assert self.export_json("1000-transactions-test.json") 2940 assert self.save(f"1000-transactions-test.{self.ext()}") 2941 2942 self.reset() 2943 2944 # test transfer between accounts with different exchange rate 2945 2946 a_SAR = "Bank (SAR)" 2947 b_USD = "Bank (USD)" 2948 c_SAR = "Safe (SAR)" 2949 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2950 for case in [ 2951 (0, a_SAR, "SAR Gift", 1000, 100000), 2952 (1, a_SAR, 1), 2953 (0, b_USD, "USD Gift", 500, 50000), 2954 (1, b_USD, 1), 2955 (2, b_USD, 3.75), 2956 (1, b_USD, 3.75), 2957 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2958 (0, c_SAR, "Salary", 750, 75000), 2959 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2960 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2961 ]: 2962 if debug: 2963 print('case', case) 2964 match (case[0]): 2965 case 0: # track 2966 _, account, desc, x, balance = case 2967 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2968 2969 cached_value = self.balance(account, cached=True) 2970 fresh_value = self.balance(account, cached=False) 2971 if debug: 2972 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2973 assert cached_value == balance 2974 assert fresh_value == balance 2975 case 1: # check-exchange 2976 _, account, expected_rate = case 2977 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2978 if debug: 2979 print('t-exchange', t_exchange) 2980 assert t_exchange['rate'] == expected_rate 2981 case 2: # do-exchange 2982 _, account, rate = case 2983 self.exchange(account, rate=rate, debug=debug) 2984 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2985 if debug: 2986 print('b-exchange', b_exchange) 2987 assert b_exchange['rate'] == rate 2988 case 3: # transfer 2989 _, x, a, b, desc, a_balance, b_balance = case 2990 self.transfer(x, a, b, desc, debug=debug) 2991 2992 cached_value = self.balance(a, cached=True) 2993 fresh_value = self.balance(a, cached=False) 2994 if debug: 2995 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 2996 assert cached_value == a_balance 2997 assert fresh_value == a_balance 2998 2999 cached_value = self.balance(b, cached=True) 3000 fresh_value = self.balance(b, cached=False) 3001 if debug: 3002 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3003 assert cached_value == b_balance 3004 assert fresh_value == b_balance 3005 3006 # Transfer all in many chunks randomly from B to A 3007 a_SAR_balance = 137125 3008 b_USD_balance = 50100 3009 b_USD_exchange = self.exchange(b_USD) 3010 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3011 if debug: 3012 print('amounts', amounts) 3013 i = 0 3014 for x in amounts: 3015 if debug: 3016 print(f'{i} - transfer-with-exchange({x})') 3017 self.transfer( 3018 unscaled_amount=self.unscale(x), 3019 from_account=b_USD, 3020 to_account=a_SAR, 3021 desc=f"{x} USD -> SAR", 3022 debug=debug, 3023 ) 3024 3025 b_USD_balance -= x 3026 cached_value = self.balance(b_USD, cached=True) 3027 fresh_value = self.balance(b_USD, cached=False) 3028 if debug: 3029 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3030 b_USD_balance) 3031 assert cached_value == b_USD_balance 3032 assert fresh_value == b_USD_balance 3033 3034 a_SAR_balance += int(x * b_USD_exchange['rate']) 3035 cached_value = self.balance(a_SAR, cached=True) 3036 fresh_value = self.balance(a_SAR, cached=False) 3037 if debug: 3038 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3039 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3040 assert cached_value == a_SAR_balance 3041 assert fresh_value == a_SAR_balance 3042 i += 1 3043 3044 # Transfer all in many chunks randomly from C to A 3045 c_SAR_balance = 37500 3046 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3047 if debug: 3048 print('amounts', amounts) 3049 i = 0 3050 for x in amounts: 3051 if debug: 3052 print(f'{i} - transfer-with-exchange({x})') 3053 self.transfer( 3054 unscaled_amount=self.unscale(x), 3055 from_account=c_SAR, 3056 to_account=a_SAR, 3057 desc=f"{x} SAR -> a_SAR", 3058 debug=debug, 3059 ) 3060 3061 c_SAR_balance -= x 3062 cached_value = self.balance(c_SAR, cached=True) 3063 fresh_value = self.balance(c_SAR, cached=False) 3064 if debug: 3065 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3066 c_SAR_balance) 3067 assert cached_value == c_SAR_balance 3068 assert fresh_value == c_SAR_balance 3069 3070 a_SAR_balance += x 3071 cached_value = self.balance(a_SAR, cached=True) 3072 fresh_value = self.balance(a_SAR, cached=False) 3073 if debug: 3074 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3075 a_SAR_balance) 3076 assert cached_value == a_SAR_balance 3077 assert fresh_value == a_SAR_balance 3078 i += 1 3079 3080 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3081 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3082 3083 # check & zakat with exchange rates for many cycles 3084 3085 for rate, values in { 3086 1: { 3087 'in': [1000, 2000, 10000], 3088 'exchanged': [100000, 200000, 1000000], 3089 'out': [2500, 5000, 73140], 3090 }, 3091 3.75: { 3092 'in': [200, 1000, 5000], 3093 'exchanged': [75000, 375000, 1875000], 3094 'out': [1875, 9375, 137138], 3095 }, 3096 }.items(): 3097 a, b, c = values['in'] 3098 m, n, o = values['exchanged'] 3099 x, y, z = values['out'] 3100 if debug: 3101 print('rate', rate, 'values', values) 3102 for case in [ 3103 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3104 {'safe': {0: {'below_nisab': x}}}, 3105 ], False, m), 3106 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3107 {'safe': {0: {'count': 1, 'total': y}}}, 3108 ], True, n), 3109 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3110 {'cave': {0: {'count': 3, 'total': z}}}, 3111 ], True, o), 3112 ]: 3113 if debug: 3114 print(f"############# check(rate: {rate}) #############") 3115 print('case', case) 3116 self.reset() 3117 self.exchange(account=case[1], created=case[2], rate=rate) 3118 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3119 assert self.snapshot() 3120 3121 # assert self.nolock() 3122 # history_size = len(self._vault['history']) 3123 # print('history_size', history_size) 3124 # assert history_size == 2 3125 assert self.lock() 3126 assert not self.nolock() 3127 report = self.check(2.17, None, debug) 3128 (valid, brief, plan) = report 3129 if debug: 3130 print('brief', brief) 3131 assert valid == case[4] 3132 assert case[5] == brief[0] 3133 assert case[5] == brief[1] 3134 3135 if debug: 3136 pp().pprint(plan) 3137 3138 for x in plan: 3139 assert case[1] == x 3140 if 'total' in case[3][0][x][0].keys(): 3141 assert case[3][0][x][0]['total'] == int(brief[2]) 3142 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3143 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3144 else: 3145 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3146 if debug: 3147 pp().pprint(report) 3148 result = self.zakat(report, debug=debug) 3149 if debug: 3150 print('zakat-result', result, case[4]) 3151 assert result == case[4] 3152 report = self.check(2.17, None, debug) 3153 (valid, brief, plan) = report 3154 assert valid is False 3155 3156 history_size = len(self._vault['history']) 3157 if debug: 3158 print('history_size', history_size) 3159 assert history_size == 3 3160 assert not self.nolock() 3161 assert self.recall(False, debug) is False 3162 self.free(self.lock()) 3163 assert self.nolock() 3164 3165 for i in range(3, 0, -1): 3166 history_size = len(self._vault['history']) 3167 if debug: 3168 print('history_size', history_size) 3169 assert history_size == i 3170 assert self.recall(False, debug) is True 3171 3172 assert self.nolock() 3173 assert self.recall(False, debug) is False 3174 3175 history_size = len(self._vault['history']) 3176 if debug: 3177 print('history_size', history_size) 3178 assert history_size == 0 3179 3180 account_size = len(self._vault['account']) 3181 if debug: 3182 print('account_size', account_size) 3183 assert account_size == 0 3184 3185 report_size = len(self._vault['report']) 3186 if debug: 3187 print('report_size', report_size) 3188 assert report_size == 0 3189 3190 assert self.nolock() 3191 return True 3192 except: 3193 # pp().pprint(self._vault) 3194 assert self.export_json("test-snapshot.json") 3195 assert self.save(f"test-snapshot.{self.ext()}") 3196 raise
3199def test(debug: bool = False): 3200 ledger = ZakatTracker() 3201 start = ZakatTracker.time() 3202 assert ledger.test(debug=debug) 3203 if debug: 3204 print("#########################") 3205 print("######## TEST DONE ########") 3206 print("#########################") 3207 print(ZakatTracker.duration_from_nanoseconds(ZakatTracker.time() - start)) 3208 print("#########################")
77class Action(Enum): 78 CREATE = auto() 79 TRACK = auto() 80 LOG = auto() 81 SUB = auto() 82 ADD_FILE = auto() 83 REMOVE_FILE = auto() 84 BOX_TRANSFER = auto() 85 EXCHANGE = auto() 86 REPORT = auto() 87 ZAKAT = auto()
90class JSONEncoder(json.JSONEncoder): 91 def default(self, obj): 92 if isinstance(obj, Action) or isinstance(obj, MathOperation): 93 return obj.name # Serialize as the enum member's name 94 elif isinstance(obj, Decimal): 95 return float(obj) 96 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
91 def default(self, obj): 92 if isinstance(obj, Action) or isinstance(obj, MathOperation): 93 return obj.name # Serialize as the enum member's name 94 elif isinstance(obj, Decimal): 95 return float(obj) 96 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
55def start_file_server(database_path: str, database_callback: callable = None, csv_callback: callable = None, 56 debug: bool = False) -> tuple: 57 """ 58 Starts a multi-purpose HTTP server to manage file interactions for a Zakat application. 59 60 This server facilitates the following functionalities: 61 62 1. GET /{file_uuid}/get: Download the database file specified by `database_path`. 63 2. GET /{file_uuid}/upload: Display an HTML form for uploading files. 64 3. POST /{file_uuid}/upload: Handle file uploads, distinguishing between: 65 - Database File (.db): Replaces the existing database with the uploaded one. 66 - CSV File (.csv): Imports data from the CSV into the existing database. 67 68 Args: 69 database_path (str): The path to the pickle database file. 70 database_callback (callable, optional): A function to call after a successful database upload. 71 It receives the uploaded database path as its argument. 72 csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, 73 the database path, and the debug flag as its arguments. 74 debug (bool, optional): If True, print debugging information. Defaults to False. 75 76 Returns: 77 Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: 78 - file_name (str): The name of the database file. 79 - download_url (str): The URL to download the database file. 80 - upload_url (str): The URL to access the file upload form. 81 - server_thread (threading.Thread): The thread running the server. 82 - shutdown_server (Callable[[], None]): A function to gracefully shut down the server. 83 84 Example: 85 _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") 86 print(f"Download database: {download_url}") 87 print(f"Upload files: {upload_url}") 88 server_thread.start() 89 # ... later ... 90 shutdown_server() 91 """ 92 file_uuid = uuid.uuid4() 93 file_name = os.path.basename(database_path) 94 95 port = find_available_port() 96 download_url = f"http://localhost:{port}/{file_uuid}/get" 97 upload_url = f"http://localhost:{port}/{file_uuid}/upload" 98 99 class Handler(http.server.SimpleHTTPRequestHandler): 100 def do_GET(self): 101 if self.path == f"/{file_uuid}/get": 102 # GET: Serve the existing file 103 try: 104 with open(database_path, "rb") as f: 105 self.send_response(200) 106 self.send_header("Content-type", "application/octet-stream") 107 self.send_header("Content-Disposition", f'attachment; filename="{file_name}"') 108 self.end_headers() 109 self.wfile.write(f.read()) 110 except FileNotFoundError: 111 self.send_error(404, "File not found") 112 elif self.path == f"/{file_uuid}/upload": 113 # GET: Serve the upload form 114 self.send_response(200) 115 self.send_header("Content-type", "text/html") 116 self.end_headers() 117 self.wfile.write(f""" 118 <html lang="en"> 119 <head> 120 <title>Zakat File Server</title> 121 </head> 122 <body> 123 <h1>Zakat File Server</h1> 124 <h3>You can download the <a target="__blank" href="{download_url}">database file</a>...</h3> 125 <h3>Or upload a new file to restore a database or import `CSV` file:</h3> 126 <form action="/{file_uuid}/upload" method="post" enctype="multipart/form-data"> 127 <input type="file" name="file" required><br/> 128 <input type="radio" id="{FileType.Database.value}" name="upload_type" value="{FileType.Database.value}" required> 129 <label for="database">Database File</label><br/> 130 <input type="radio"id="{FileType.CSV.value}" name="upload_type" value="{FileType.CSV.value}"> 131 <label for="csv">CSV File</label><br/> 132 <input type="submit" value="Upload"><br/> 133 </form> 134 </body></html> 135 """.encode()) 136 else: 137 self.send_error(404) 138 139 def do_POST(self): 140 if self.path == f"/{file_uuid}/upload": 141 # POST: Handle request 142 # 1. Get the Form Data 143 form_data = cgi.FieldStorage( 144 fp=self.rfile, 145 headers=self.headers, 146 environ={'REQUEST_METHOD': 'POST'} 147 ) 148 upload_type = form_data.getvalue("upload_type") 149 150 if debug: 151 print('upload_type', upload_type) 152 153 if upload_type not in [FileType.Database.value, FileType.CSV.value]: 154 self.send_error(400, "Invalid upload type") 155 return 156 157 # 2. Extract File Data 158 file_item = form_data['file'] # Assuming 'file' is your file input name 159 160 # 3. Get File Details 161 filename = file_item.filename 162 file_data = file_item.file.read() # Read the file's content 163 164 if debug: 165 print(f'Uploaded filename: {filename}') 166 167 # 4. Define Storage Path for CSV 168 upload_directory = "./uploads" # Create this directory if it doesn't exist 169 os.makedirs(upload_directory, exist_ok=True) 170 file_path = os.path.join(upload_directory, upload_type) 171 172 # 5. Write to Disk 173 with open(file_path, 'wb') as f: 174 f.write(file_data) 175 176 match upload_type: 177 case FileType.Database.value: 178 179 try: 180 # 6. Verify database file 181 # ZakatTracker(db_path=file_path) # FATAL, Circular Imports Error 182 if database_callback is not None: 183 database_callback(file_path) 184 185 # 7. Copy database into the original path 186 shutil.copy2(file_path, database_path) 187 except Exception as e: 188 self.send_error(400, str(e)) 189 return 190 191 case FileType.CSV.value: 192 # 6. Verify CSV file 193 try: 194 # x = ZakatTracker(db_path=database_path) # FATAL, Circular Imports Error 195 # result = x.import_csv(file_path, debug=debug) 196 if csv_callback is not None: 197 result = csv_callback(file_path, database_path, debug) 198 if debug: 199 print(f'CSV imported: {result}') 200 if len(result[2]) != 0: 201 self.send_response(200) 202 self.end_headers() 203 self.wfile.write(json.dumps(result).encode()) 204 return 205 except Exception as e: 206 self.send_error(400, str(e)) 207 return 208 209 self.send_response(200) 210 self.end_headers() 211 self.wfile.write(b"File uploaded successfully.") 212 213 httpd = socketserver.TCPServer(("localhost", port), Handler) 214 server_thread = threading.Thread(target=httpd.serve_forever) 215 216 def shutdown_server(): 217 nonlocal httpd, server_thread 218 httpd.shutdown() 219 httpd.server_close() # Close the socket 220 server_thread.join() # Wait for the thread to finish 221 222 return file_name, download_url, upload_url, server_thread, shutdown_server
Starts a multi-purpose HTTP server to manage file interactions for a Zakat application.
This server facilitates the following functionalities:
- GET /{file_uuid}/get: Download the database file specified by
database_path
. - GET /{file_uuid}/upload: Display an HTML form for uploading files.
- POST /{file_uuid}/upload: Handle file uploads, distinguishing between:
- Database File (.db): Replaces the existing database with the uploaded one.
- CSV File (.csv): Imports data from the CSV into the existing database.
Args: database_path (str): The path to the pickle database file. database_callback (callable, optional): A function to call after a successful database upload. It receives the uploaded database path as its argument. csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, the database path, and the debug flag as its arguments. debug (bool, optional): If True, print debugging information. Defaults to False.
Returns: Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: - file_name (str): The name of the database file. - download_url (str): The URL to download the database file. - upload_url (str): The URL to access the file upload form. - server_thread (threading.Thread): The thread running the server. - shutdown_server (Callable[[], None]): A function to gracefully shut down the server.
Example: _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") print(f"Download database: {download_url}") print(f"Upload files: {upload_url}") server_thread.start() # ... later ... shutdown_server()
32def find_available_port() -> int: 33 """ 34 Finds and returns an available TCP port on the local machine. 35 36 This function utilizes a TCP server socket to bind to port 0, which 37 instructs the operating system to automatically assign an available 38 port. The assigned port is then extracted and returned. 39 40 Returns: 41 int: The available TCP port number. 42 43 Raises: 44 OSError: If an error occurs during the port binding process, such 45 as all ports being in use. 46 47 Example: 48 port = find_available_port() 49 print(f"Available port: {port}") 50 """ 51 with socketserver.TCPServer(("localhost", 0), None) as s: 52 return s.server_address[1]
Finds and returns an available TCP port on the local machine.
This function utilizes a TCP server socket to bind to port 0, which instructs the operating system to automatically assign an available port. The assigned port is then extracted and returned.
Returns: int: The available TCP port number.
Raises: OSError: If an error occurs during the port binding process, such as all ports being in use.
Example: port = find_available_port() print(f"Available port: {port}")