zakat
xxx
_____ _ _ _ _ _
|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _
/ // _| |/ / _
| __| | | | | '_ \| '__/ _` | '__| | | |
/ /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| |
/______,_|_|___,_|__| |_____|_|_.__/|_| __,_|_| __, |
|___/
"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف ... Never Trust, Always Verify ...
This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat.
1""" 2 _____ _ _ _ _ _ 3|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _ 4 / // _` | |/ / _` | __| | | | | '_ \| '__/ _` | '__| | | | 5 / /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| | 6/____\__,_|_|\_\__,_|\__| |_____|_|_.__/|_| \__,_|_| \__, | 7 |___/ 8 9"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف 10... Never Trust, Always Verify ... 11 12This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat. 13""" 14# Importing necessary classes and functions from the main module 15from zakat.zakat_tracker import ( 16 ZakatTracker, 17 test, 18 Action, 19 JSONEncoder, 20 MathOperation, 21) 22 23from zakat.file_server import ( 24 start_file_server, 25 find_available_port, 26 FileType, 27) 28 29# Version information for the module 30__version__ = ZakatTracker.Version() 31__all__ = [ 32 "ZakatTracker", 33 "test", 34 "Action", 35 "JSONEncoder", 36 "MathOperation", 37 "start_file_server", 38 "find_available_port", 39 "FileType", 40]
122class ZakatTracker: 123 """ 124 A class for tracking and calculating Zakat. 125 126 This class provides functionalities for recording transactions, calculating Zakat due, 127 and managing account balances. It also offers features like importing transactions from 128 CSV files, exporting data to JSON format, and saving/loading the tracker state. 129 130 The `ZakatTracker` class is designed to handle both positive and negative transactions, 131 allowing for flexible tracking of financial activities related to Zakat. It also supports 132 the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due 133 based on the current silver price. 134 135 The class uses a camel file as its database to persist the tracker state, 136 ensuring data integrity across sessions. It also provides options for enabling or 137 disabling history tracking, allowing users to choose their preferred level of detail. 138 139 In addition, the `ZakatTracker` class includes various helper methods like 140 `time`, `time_to_datetime`, `lock`, `free`, `recall`, `export_json`, 141 and more. These methods provide additional functionalities and flexibility 142 for interacting with and managing the Zakat tracker. 143 144 Attributes: 145 ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. 146 ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. 147 ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. 148 ZakatTracker.Version (function): The version of the ZakatTracker class. 149 150 Data Structure: 151 The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data. 152 153 _vault (dict): 154 - account (dict): 155 - {account_number} (dict): 156 - balance (int): The current balance of the account. 157 - box (dict): A dictionary storing transaction details. 158 - {timestamp} (dict): 159 - capital (int): The initial amount of the transaction. 160 - count (int): The number of times Zakat has been calculated for this transaction. 161 - last (int): The timestamp of the last Zakat calculation. 162 - rest (int): The remaining amount after Zakat deductions and withdrawal. 163 - total (int): The total Zakat deducted from this transaction. 164 - count (int): The total number of transactions for the account. 165 - log (dict): A dictionary storing transaction logs. 166 - {timestamp} (dict): 167 - value (int): The transaction amount (positive or negative). 168 - desc (str): The description of the transaction. 169 - ref (int): The box reference (positive or None). 170 - file (dict): A dictionary storing file references associated with the transaction. 171 - hide (bool): Indicates whether the account is hidden or not. 172 - zakatable (bool): Indicates whether the account is subject to Zakat. 173 - exchange (dict): 174 - account (dict): 175 - {timestamps} (dict): 176 - rate (float): Exchange rate when compared to local currency. 177 - description (str): The description of the exchange rate. 178 - history (dict): 179 - {timestamp} (list): A list of dictionaries storing the history of actions performed. 180 - {action_dict} (dict): 181 - action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT). 182 - account (str): The account number associated with the action. 183 - ref (int): The reference number of the transaction. 184 - file (int): The reference number of the file (if applicable). 185 - key (str): The key associated with the action (e.g., 'rest', 'total'). 186 - value (int): The value associated with the action. 187 - math (MathOperation): The mathematical operation performed (if applicable). 188 - lock (int or None): The timestamp indicating the current lock status (None if not locked). 189 - report (dict): 190 - {timestamp} (tuple): A tuple storing Zakat report details. 191 192 """ 193 194 @staticmethod 195 def Version() -> str: 196 """ 197 Returns the current version of the software. 198 199 This function returns a string representing the current version of the software, 200 including major, minor, and patch version numbers in the format "X.Y.Z". 201 202 Returns: 203 str: The current version of the software. 204 """ 205 return '0.2.86' 206 207 @staticmethod 208 def ZakatCut(x: float) -> float: 209 """ 210 Calculates the Zakat amount due on an asset. 211 212 This function calculates the zakat amount due on a given asset value over one lunar year. 213 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 214 that exceeds a certain threshold (Nisab). 215 216 Parameters: 217 x: The total value of the asset on which Zakat is to be calculated. 218 219 Returns: 220 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 221 """ 222 return 0.025 * x # Zakat Cut in one Lunar Year 223 224 @staticmethod 225 def TimeCycle(days: int = 355) -> int: 226 """ 227 Calculates the approximate duration of a lunar year in nanoseconds. 228 229 This function calculates the approximate duration of a lunar year based on the given number of days. 230 It converts the given number of days into nanoseconds for use in high-precision timing applications. 231 232 Parameters: 233 days: The number of days in a lunar year. Defaults to 355, 234 which is an approximation of the average length of a lunar year. 235 236 Returns: 237 The approximate duration of a lunar year in nanoseconds. 238 """ 239 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds 240 241 @staticmethod 242 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 243 """ 244 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 245 246 This function calculates the Nisab value, which is the minimum threshold of wealth, 247 that makes an individual liable for paying Zakat. 248 The Nisab value is determined by the equivalent value of a specific amount 249 of gold or silver (currently 595 grams in silver) in the local currency. 250 251 Parameters: 252 - gram_price (float): The price per gram of Nisab. 253 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 254 255 Returns: 256 - float: The total value of Nisab based on the given price per gram. 257 """ 258 return gram_price * gram_quantity 259 260 @staticmethod 261 def ext() -> str: 262 """ 263 Returns the file extension used by the ZakatTracker class. 264 265 Returns: 266 str: The file extension used by the ZakatTracker class, which is 'camel'. 267 """ 268 return 'camel' 269 270 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 271 """ 272 Initialize ZakatTracker with database path and history mode. 273 274 Parameters: 275 db_path (str): The path to the database file. Default is "zakat.camel". 276 history_mode (bool): The mode for tracking history. Default is True. 277 278 Returns: 279 None 280 """ 281 self._base_path = None 282 self._vault_path = None 283 self._vault = None 284 self.reset() 285 self._history(history_mode) 286 self.path(db_path) 287 288 def path(self, path: str = None) -> str: 289 """ 290 Set or get the path to the database file. 291 292 If no path is provided, the current path is returned. 293 If a path is provided, it is set as the new path. 294 The function also creates the necessary directories if the provided path is a file. 295 296 Parameters: 297 path (str): The new path to the database file. If not provided, the current path is returned. 298 299 Returns: 300 str: The current or new path to the database file. 301 """ 302 if path is None: 303 return self._vault_path 304 self._vault_path = Path(path).resolve() 305 base_path = Path(path).resolve() 306 if base_path.is_file() or base_path.suffix: 307 base_path = base_path.parent 308 base_path.mkdir(parents=True, exist_ok=True) 309 self._base_path = base_path 310 return str(self._vault_path) 311 312 def base_path(self, *args) -> str: 313 """ 314 Generate a base path by joining the provided arguments with the existing base path. 315 316 Parameters: 317 *args (str): Variable length argument list of strings to be joined with the base path. 318 319 Returns: 320 str: The generated base path. If no arguments are provided, the existing base path is returned. 321 """ 322 if not args: 323 return str(self._base_path) 324 filtered_args = [] 325 ignored_filename = None 326 for arg in args: 327 if Path(arg).suffix: 328 ignored_filename = arg 329 else: 330 filtered_args.append(arg) 331 base_path = Path(self._base_path) 332 full_path = base_path.joinpath(*filtered_args) 333 full_path.mkdir(parents=True, exist_ok=True) 334 if ignored_filename is not None: 335 return full_path.resolve() / ignored_filename # Join with the ignored filename 336 return str(full_path.resolve()) 337 338 @staticmethod 339 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 340 """ 341 Scales a numerical value by a specified power of 10, returning an integer. 342 343 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 344 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 345 346 Parameters: 347 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 348 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 349 by a factor of 100 (e.g., converts 1.23 to 123). 350 351 Returns: 352 The scaled value, rounded to the nearest integer. 353 354 Raises: 355 TypeError: If the input `x` is not a valid numeric type. 356 357 Examples: 358 >>> ZakatTracker.scale(3.14159) 359 314 360 >>> ZakatTracker.scale(1234, decimal_places=3) 361 1234000 362 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 363 50 364 """ 365 if not isinstance(x, (float, int, Decimal)): 366 raise TypeError("Input 'x' must be a float, int, or Decimal.") 367 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places)) 368 369 @staticmethod 370 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 371 """ 372 Unscales an integer by a power of 10. 373 374 Parameters: 375 x: The integer to unscale. 376 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 377 decimal_places: The power of 10 to use. Defaults to 2. 378 379 Returns: 380 The unscaled number, converted to the specified return_type. 381 382 Raises: 383 TypeError: If the return_type is not float or Decimal. 384 """ 385 if return_type not in (float, Decimal): 386 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 387 return round(return_type(x / (10 ** decimal_places)), decimal_places) 388 389 def _history(self, status: bool = None) -> bool: 390 """ 391 Enable or disable history tracking. 392 393 Parameters: 394 status (bool): The status of history tracking. Default is True. 395 396 Returns: 397 None 398 """ 399 if status is not None: 400 self._history_mode = status 401 return self._history_mode 402 403 def reset(self) -> None: 404 """ 405 Reset the internal data structure to its initial state. 406 407 Parameters: 408 None 409 410 Returns: 411 None 412 """ 413 self._vault = { 414 'account': {}, 415 'exchange': {}, 416 'history': {}, 417 'lock': None, 418 'report': {}, 419 } 420 421 @staticmethod 422 def time(now: datetime = None) -> int: 423 """ 424 Generates a timestamp based on the provided datetime object or the current datetime. 425 426 Parameters: 427 now (datetime, optional): The datetime object to generate the timestamp from. 428 If not provided, the current datetime is used. 429 430 Returns: 431 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 432 before 1970 will return in negative until 1000AD. 433 """ 434 if now is None: 435 now = datetime.datetime.now() 436 ordinal_day = now.toordinal() 437 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 438 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day) 439 440 @staticmethod 441 def time_to_datetime(ordinal_ns: int) -> datetime: 442 """ 443 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 444 445 Parameters: 446 ordinal_ns (int): The ordinal number of days since 1000-01-01. 447 448 Returns: 449 datetime: The corresponding datetime object. 450 """ 451 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 452 ns_in_day = ordinal_ns % 86_400_000_000_000 453 d = datetime.datetime.fromordinal(ordinal_day) 454 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 455 return datetime.datetime.combine(d, datetime.time()) + t 456 457 def clean_history(self, lock: int | None = None) -> int: 458 """ 459 Cleans up the history of actions performed on the ZakatTracker instance. 460 461 Parameters: 462 lock (int, optional): The lock ID is used to clean up the empty history. 463 If not provided, it cleans up the empty history records for all locks. 464 465 Returns: 466 int: The number of locks cleaned up. 467 """ 468 count = 0 469 if lock in self._vault['history']: 470 if len(self._vault['history'][lock]) <= 0: 471 count += 1 472 del self._vault['history'][lock] 473 return count 474 self.free(self.lock()) 475 for lock in self._vault['history']: 476 if len(self._vault['history'][lock]) <= 0: 477 count += 1 478 del self._vault['history'][lock] 479 return count 480 481 def _step(self, action: Action = None, account=None, ref: int = None, file: int = None, value: float = None, 482 key: str = None, math_operation: MathOperation = None) -> int: 483 """ 484 This method is responsible for recording the actions performed on the ZakatTracker. 485 486 Parameters: 487 - action (Action): The type of action performed. 488 - account (str): The account number on which the action was performed. 489 - ref (int): The reference number of the action. 490 - file (int): The file reference number of the action. 491 - value (int): The value associated with the action. 492 - key (str): The key associated with the action. 493 - math_operation (MathOperation): The mathematical operation performed during the action. 494 495 Returns: 496 - int: The lock time of the recorded action. If no lock was performed, it returns 0. 497 """ 498 if not self._history(): 499 return 0 500 lock = self._vault['lock'] 501 if self.nolock(): 502 lock = self._vault['lock'] = self.time() 503 self._vault['history'][lock] = [] 504 if action is None: 505 return lock 506 self._vault['history'][lock].append({ 507 'action': action, 508 'account': account, 509 'ref': ref, 510 'file': file, 511 'key': key, 512 'value': value, 513 'math': math_operation, 514 }) 515 return lock 516 517 def nolock(self) -> bool: 518 """ 519 Check if the vault lock is currently not set. 520 521 Returns: 522 bool: True if the vault lock is not set, False otherwise. 523 """ 524 return self._vault['lock'] is None 525 526 def lock(self) -> int: 527 """ 528 Acquires a lock on the ZakatTracker instance. 529 530 Returns: 531 int: The lock ID. This ID can be used to release the lock later. 532 """ 533 return self._step() 534 535 def vault(self) -> dict: 536 """ 537 Returns a copy of the internal vault dictionary. 538 539 This method is used to retrieve the current state of the ZakatTracker object. 540 It provides a snapshot of the internal data structure, allowing for further 541 processing or analysis. 542 543 Returns: 544 dict: A copy of the internal vault dictionary. 545 """ 546 return self._vault.copy() 547 548 def stats(self) -> dict[str, tuple]: 549 """ 550 Calculates and returns statistics about the object's data storage. 551 552 This method determines the size of the database file on disk and the 553 size of the data currently held in RAM (likely within a dictionary). 554 Both sizes are reported in bytes and in a human-readable format 555 (e.g., KB, MB). 556 557 Returns: 558 dict[str, tuple]: A dictionary containing the following statistics: 559 560 * 'database': A tuple with two elements: 561 - The database file size in bytes (int). 562 - The database file size in human-readable format (str). 563 * 'ram': A tuple with two elements: 564 - The RAM usage (dictionary size) in bytes (int). 565 - The RAM usage in human-readable format (str). 566 567 Example: 568 >>> stats = my_object.stats() 569 >>> print(stats['database']) 570 (256000, '250.0 KB') 571 >>> print(stats['ram']) 572 (12345, '12.1 KB') 573 """ 574 ram_size = self.get_dict_size(self.vault()) 575 file_size = os.path.getsize(self.path()) 576 return { 577 'database': (file_size, self.human_readable_size(file_size)), 578 'ram': (ram_size, self.human_readable_size(ram_size)), 579 } 580 581 def files(self) -> list[dict[str, str | int]]: 582 """ 583 Retrieves information about files associated with this class. 584 585 This class method provides a standardized way to gather details about 586 files used by the class for storage, snapshots, and CSV imports. 587 588 Returns: 589 list[dict[str, str | int]]: A list of dictionaries, each containing information 590 about a specific file: 591 592 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 593 * path (str): The full file path. 594 * exists (bool): Whether the file exists on the filesystem. 595 * size (int): The file size in bytes (0 if the file doesn't exist). 596 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 597 598 Example: 599 ``` 600 file_info = MyClass.files() 601 for info in file_info: 602 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 603 ``` 604 """ 605 result = [] 606 for file_type, path in { 607 'database': self.path(), 608 'snapshot': self.snapshot_cache_path(), 609 'import_csv': self.import_csv_cache_path(), 610 }.items(): 611 exists = os.path.exists(path) 612 size = os.path.getsize(path) if exists else 0 613 human_readable_size = self.human_readable_size(size) if exists else 0 614 result.append({ 615 'type': file_type, 616 'path': path, 617 'exists': exists, 618 'size': size, 619 'human_readable_size': human_readable_size, 620 }) 621 return result 622 623 def steps(self) -> dict: 624 """ 625 Returns a copy of the history of steps taken in the ZakatTracker. 626 627 The history is a dictionary where each key is a unique identifier for a step, 628 and the corresponding value is a dictionary containing information about the step. 629 630 Returns: 631 dict: A copy of the history of steps taken in the ZakatTracker. 632 """ 633 return self._vault['history'].copy() 634 635 def free(self, lock: int, auto_save: bool = True) -> bool: 636 """ 637 Releases the lock on the database. 638 639 Parameters: 640 lock (int): The lock ID to be released. 641 auto_save (bool): Whether to automatically save the database after releasing the lock. 642 643 Returns: 644 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 645 """ 646 if lock == self._vault['lock']: 647 self._vault['lock'] = None 648 self.clean_history(lock) 649 if auto_save: 650 return self.save(self.path()) 651 return True 652 return False 653 654 def account_exists(self, account) -> bool: 655 """ 656 Check if the given account exists in the vault. 657 658 Parameters: 659 account (str): The account number to check. 660 661 Returns: 662 bool: True if the account exists, False otherwise. 663 """ 664 return account in self._vault['account'] 665 666 def box_size(self, account) -> int: 667 """ 668 Calculate the size of the box for a specific account. 669 670 Parameters: 671 account (str): The account number for which the box size needs to be calculated. 672 673 Returns: 674 int: The size of the box for the given account. If the account does not exist, -1 is returned. 675 """ 676 if self.account_exists(account): 677 return len(self._vault['account'][account]['box']) 678 return -1 679 680 def log_size(self, account) -> int: 681 """ 682 Get the size of the log for a specific account. 683 684 Parameters: 685 account (str): The account number for which the log size needs to be calculated. 686 687 Returns: 688 int: The size of the log for the given account. If the account does not exist, -1 is returned. 689 """ 690 if self.account_exists(account): 691 return len(self._vault['account'][account]['log']) 692 return -1 693 694 @staticmethod 695 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 696 """ 697 Calculates the hash of a file using the specified algorithm. 698 699 Parameters: 700 file_path (str): The path to the file. 701 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 702 703 Returns: 704 str: The hexadecimal representation of the file's hash. 705 """ 706 hash_obj = hashlib.new(algorithm) # Create the hash object 707 with open(file_path, "rb") as f: # Open file in binary mode for reading 708 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 709 hash_obj.update(chunk) 710 return hash_obj.hexdigest() # Return the hash as a hexadecimal string 711 712 def snapshot_cache_path(self): 713 """ 714 Generate the path for the cache file used to store snapshots. 715 716 The cache file is a camel file that stores the timestamps of the snapshots. 717 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 718 719 Returns: 720 str: The path to the cache file. 721 """ 722 path = str(self.path()) 723 ext = self.ext() 724 ext_len = len(ext) 725 if path.endswith(f'.{ext}'): 726 path = path[:-ext_len-1] 727 _, filename = os.path.split(path + f'.snapshots.{ext}') 728 return self.base_path(filename) 729 730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True 761 762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result 793 794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True 941 942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False 957 958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref) 970 971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created 1036 1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref) 1049 1050 def _log(self, value: float, desc: str = '', account: str = 1, created: int = None, ref: int = None, 1051 debug: bool = False) -> int: 1052 """ 1053 Log a transaction into the account's log. 1054 1055 Parameters: 1056 value (float): The value of the transaction. 1057 desc (str): The description of the transaction. 1058 account (str): The account to log the transaction into. Default is '1'. 1059 created (int): The timestamp of the transaction. If not provided, it will be generated. 1060 1061 Returns: 1062 int: The timestamp of the logged transaction. 1063 1064 This method updates the account's balance, count, and log with the transaction details. 1065 It also creates a step in the history of the transaction. 1066 1067 Raises: 1068 ValueError: The log transaction happened again in the same nanosecond time. 1069 """ 1070 if debug: 1071 print('_log', f'debug={debug}') 1072 if created is None: 1073 created = self.time() 1074 try: 1075 self._vault['account'][account]['balance'] += value 1076 except TypeError: 1077 self._vault['account'][account]['balance'] += Decimal(value) 1078 self._vault['account'][account]['count'] += 1 1079 if debug: 1080 print('create-log', created) 1081 if self.log_exists(account, created): 1082 raise ValueError(f"The log transaction happened again in the same nanosecond time({created}).") 1083 if debug: 1084 print('created-log', created) 1085 self._vault['account'][account]['log'][created] = { 1086 'value': value, 1087 'desc': desc, 1088 'ref': ref, 1089 'file': {}, 1090 } 1091 self._step(Action.LOG, account, ref=created, value=value) 1092 return created 1093 1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ 1144 1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate 1159 1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy() 1173 1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result 1188 1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {} 1203 1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {} 1218 1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0 1242 1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False 1267 1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1] 1287 1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False 1320 1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False 1353 1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages 1433 1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times 1508 1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan 1621 1622 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 demand (float): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': float, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': demand, 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts 1656 1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0 1707 1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1779 if no_lock: 1780 self.free(self.lock()) 1781 return True 1782 1783 def export_json(self, path: str = "data.json") -> bool: 1784 """ 1785 Exports the current state of the ZakatTracker object to a JSON file. 1786 1787 Parameters: 1788 path (str): The path where the JSON file will be saved. Default is "data.json". 1789 1790 Returns: 1791 bool: True if the export is successful, False otherwise. 1792 1793 Raises: 1794 No specific exceptions are raised by this method. 1795 """ 1796 with open(path, "w") as file: 1797 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1798 return True 1799 1800 def save(self, path: str = None) -> bool: 1801 """ 1802 Saves the ZakatTracker's current state to a camel file. 1803 1804 This method serializes the internal data (`_vault`). 1805 1806 Parameters: 1807 path (str, optional): File path for saving. Defaults to a predefined location. 1808 1809 Returns: 1810 bool: True if the save operation is successful, False otherwise. 1811 """ 1812 if path is None: 1813 path = self.path() 1814 with open(f'{path}.tmp', 'w') as stream: 1815 # first save in tmp file 1816 stream.write(camel.dump(self._vault)) 1817 # then move tmp file to original location 1818 shutil.move(f'{path}.tmp', path) 1819 return True 1820 1821 def load(self, path: str = None) -> bool: 1822 """ 1823 Load the current state of the ZakatTracker object from a camel file. 1824 1825 Parameters: 1826 path (str): The path where the camel file is located. If not provided, it will use the default path. 1827 1828 Returns: 1829 bool: True if the load operation is successful, False otherwise. 1830 """ 1831 if path is None: 1832 path = self.path() 1833 if os.path.exists(path): 1834 with open(path, 'r') as stream: 1835 self._vault = camel.load(stream.read()) 1836 return True 1837 return False 1838 1839 def import_csv_cache_path(self): 1840 """ 1841 Generates the cache file path for imported CSV data. 1842 1843 This function constructs the file path where cached data from CSV imports 1844 will be stored. The cache file is a camel file (.camel extension) appended 1845 to the base path of the object. 1846 1847 Returns: 1848 str: The full path to the import CSV cache file. 1849 1850 Example: 1851 >>> obj = ZakatTracker('/data/reports') 1852 >>> obj.import_csv_cache_path() 1853 '/data/reports.import_csv.camel' 1854 """ 1855 path = str(self.path()) 1856 ext = self.ext() 1857 ext_len = len(ext) 1858 if path.endswith(f'.{ext}'): 1859 path = path[:-ext_len-1] 1860 _, filename = os.path.split(path + f'.import_csv.{ext}') 1861 return self.base_path(filename) 1862 1863 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1864 """ 1865 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1866 1867 Parameters: 1868 path (str): The path to the CSV file. Default is 'file.csv'. 1869 debug (bool): A flag indicating whether to print debug information. 1870 1871 Returns: 1872 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1873 and a dictionary of bad transactions. 1874 1875 Notes: 1876 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1877 are appropriate for the currency pairs involved in the conversions. 1878 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1879 to 1.0 or the previous rate for that account. 1880 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1881 transactions of the same account within the whole imported and existing dataset when doing `check` and 1882 `zakat` operations. 1883 1884 Example Usage: 1885 The CSV file should have the following format, rate is optional per transaction: 1886 account, desc, value, date, rate 1887 For example: 1888 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1889 """ 1890 if debug: 1891 print('import_csv', f'debug={debug}') 1892 cache: list[int] = [] 1893 try: 1894 with open(self.import_csv_cache_path(), 'r') as stream: 1895 cache = camel.load(stream.read()) 1896 except: 1897 pass 1898 date_formats = [ 1899 "%Y-%m-%d %H:%M:%S", 1900 "%Y-%m-%dT%H:%M:%S", 1901 "%Y-%m-%dT%H%M%S", 1902 "%Y-%m-%d", 1903 ] 1904 created, found, bad = 0, 0, {} 1905 data: dict[int, list] = {} 1906 with open(path, newline='', encoding="utf-8") as f: 1907 i = 0 1908 for row in csv.reader(f, delimiter=','): 1909 i += 1 1910 hashed = hash(tuple(row)) 1911 if hashed in cache: 1912 found += 1 1913 continue 1914 account = row[0] 1915 desc = row[1] 1916 value = float(row[2]) 1917 rate = 1.0 1918 if row[4:5]: # Empty list if index is out of range 1919 rate = float(row[4]) 1920 date: int = 0 1921 for time_format in date_formats: 1922 try: 1923 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1924 break 1925 except: 1926 pass 1927 # TODO: not allowed for negative dates in the future after enhance time functions 1928 if date == 0: 1929 bad[i] = row + ['invalid date'] 1930 if value == 0: 1931 bad[i] = row + ['invalid value'] 1932 continue 1933 if date not in data: 1934 data[date] = [] 1935 data[date].append((i, account, desc, value, date, rate, hashed)) 1936 1937 if debug: 1938 print('import_csv', len(data)) 1939 1940 if bad: 1941 return created, found, bad 1942 1943 for date, rows in sorted(data.items()): 1944 try: 1945 len_rows = len(rows) 1946 if len_rows == 1: 1947 (_, account, desc, value, date, rate, hashed) = rows[0] 1948 if rate > 0: 1949 self.exchange(account, created=date, rate=rate) 1950 if value > 0: 1951 self.track(value, desc, account, True, date) 1952 elif value < 0: 1953 self.sub(-value, desc, account, date) 1954 created += 1 1955 cache.append(hashed) 1956 continue 1957 if debug: 1958 print('-- Duplicated time detected', date, 'len', len_rows) 1959 print(rows) 1960 print('---------------------------------') 1961 # If records are found at the same time with different accounts in the same amount 1962 # (one positive and the other negative), this indicates it is a transfer. 1963 if len_rows != 2: 1964 raise Exception(f'more than two transactions({len_rows}) at the same time') 1965 (i, account1, desc1, value1, date1, rate1, _) = rows[0] 1966 (j, account2, desc2, value2, date2, rate2, _) = rows[1] 1967 if account1 == account2 or desc1 != desc2 or abs(value1) != abs(value2) or date1 != date2: 1968 raise Exception('invalid transfer') 1969 if rate1 > 0: 1970 self.exchange(account1, created=date1, rate=rate1) 1971 if rate2 > 0: 1972 self.exchange(account2, created=date2, rate=rate2) 1973 values = { 1974 value1: account1, 1975 value2: account2, 1976 } 1977 self.transfer( 1978 unscaled_amount=abs(value1), 1979 from_account=values[min(values.keys())], 1980 to_account=values[max(values.keys())], 1981 desc=desc1, 1982 created=date1, 1983 ) 1984 except Exception as e: 1985 for (i, account, desc, value, date, rate, _) in rows: 1986 bad[i] = (account, desc, value, date, rate, e) 1987 break 1988 with open(self.import_csv_cache_path(), 'w') as stream: 1989 stream.write(camel.dump(cache)) 1990 return created, found, bad 1991 1992 ######## 1993 # TESTS # 1994 ####### 1995 1996 @staticmethod 1997 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1998 """ 1999 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 2000 2001 This function iterates through progressively larger units of information 2002 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2003 range that can be expressed with a reasonable number before the unit. 2004 2005 Parameters: 2006 size (float): The size in bytes to convert. 2007 decimal_places (int, optional): The number of decimal places to display 2008 in the result. Defaults to 2. 2009 2010 Returns: 2011 str: A string representation of the size in a human-readable format, 2012 rounded to the specified number of decimal places. For example: 2013 - "1.50 KB" (1536 bytes) 2014 - "23.00 MB" (24117248 bytes) 2015 - "1.23 GB" (1325899906 bytes) 2016 """ 2017 if type(size) not in (float, int): 2018 raise TypeError("size must be a float or integer") 2019 if type(decimal_places) != int: 2020 raise TypeError("decimal_places must be an integer") 2021 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2022 if size < 1024.0: 2023 break 2024 size /= 1024.0 2025 return f"{size:.{decimal_places}f} {unit}" 2026 2027 @staticmethod 2028 def get_dict_size(obj: dict, seen: set = None) -> float: 2029 """ 2030 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2031 2032 This function traverses the dictionary structure, accounting for the size of keys, values, 2033 and any nested objects. It handles various data types commonly found in dictionaries 2034 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2035 of circular references. 2036 2037 Parameters: 2038 obj (dict): The dictionary whose size is to be calculated. 2039 seen (set, optional): A set used internally to track visited objects 2040 and avoid circular references. Defaults to None. 2041 2042 Returns: 2043 float: An approximate size of the dictionary and its contents in bytes. 2044 2045 Note: 2046 - This function is a method of the `ZakatTracker` class and is likely used to 2047 estimate the memory footprint of data structures relevant to Zakat calculations. 2048 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2049 not account for all memory overhead depending on the Python implementation. 2050 - Circular references are handled to prevent infinite recursion. 2051 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2052 - String sizes are estimated based on character length and encoding. 2053 """ 2054 size = 0 2055 if seen is None: 2056 seen = set() 2057 2058 obj_id = id(obj) 2059 if obj_id in seen: 2060 return 0 2061 2062 seen.add(obj_id) 2063 size += sys.getsizeof(obj) 2064 2065 if isinstance(obj, dict): 2066 for k, v in obj.items(): 2067 size += ZakatTracker.get_dict_size(k, seen) 2068 size += ZakatTracker.get_dict_size(v, seen) 2069 elif isinstance(obj, (list, tuple, set, frozenset)): 2070 for item in obj: 2071 size += ZakatTracker.get_dict_size(item, seen) 2072 elif isinstance(obj, (int, float, complex)): # Handle numbers 2073 pass # Basic numbers have a fixed size, so nothing to add here 2074 elif isinstance(obj, str): # Handle strings 2075 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2076 return size 2077 2078 @staticmethod 2079 def duration_from_nanoseconds(ns: int, 2080 show_zeros_in_spoken_time: bool = False, 2081 spoken_time_separator=',', 2082 millennia: str = 'Millennia', 2083 century: str = 'Century', 2084 years: str = 'Years', 2085 days: str = 'Days', 2086 hours: str = 'Hours', 2087 minutes: str = 'Minutes', 2088 seconds: str = 'Seconds', 2089 milli_seconds: str = 'MilliSeconds', 2090 micro_seconds: str = 'MicroSeconds', 2091 nano_seconds: str = 'NanoSeconds', 2092 ) -> tuple: 2093 """ 2094 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2095 Convert NanoSeconds to Human Readable Time Format. 2096 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2097 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2098 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2099 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2100 2101 INPUT : ms (AKA: MilliSeconds) 2102 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2103 OUTPUT Variables: time_lapsed, spoken_time 2104 2105 Example Input: duration_from_nanoseconds(ns) 2106 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2107 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2108 duration_from_nanoseconds(1234567890123456789012) 2109 """ 2110 us, ns = divmod(ns, 1000) 2111 ms, us = divmod(us, 1000) 2112 s, ms = divmod(ms, 1000) 2113 m, s = divmod(s, 60) 2114 h, m = divmod(m, 60) 2115 d, h = divmod(h, 24) 2116 y, d = divmod(d, 365) 2117 c, y = divmod(y, 100) 2118 n, c = divmod(c, 10) 2119 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2120 spoken_time_part = [] 2121 if n > 0 or show_zeros_in_spoken_time: 2122 spoken_time_part.append(f"{n: 3d} {millennia}") 2123 if c > 0 or show_zeros_in_spoken_time: 2124 spoken_time_part.append(f"{c: 4d} {century}") 2125 if y > 0 or show_zeros_in_spoken_time: 2126 spoken_time_part.append(f"{y: 3d} {years}") 2127 if d > 0 or show_zeros_in_spoken_time: 2128 spoken_time_part.append(f"{d: 4d} {days}") 2129 if h > 0 or show_zeros_in_spoken_time: 2130 spoken_time_part.append(f"{h: 2d} {hours}") 2131 if m > 0 or show_zeros_in_spoken_time: 2132 spoken_time_part.append(f"{m: 2d} {minutes}") 2133 if s > 0 or show_zeros_in_spoken_time: 2134 spoken_time_part.append(f"{s: 2d} {seconds}") 2135 if ms > 0 or show_zeros_in_spoken_time: 2136 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2137 if us > 0 or show_zeros_in_spoken_time: 2138 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2139 if ns > 0 or show_zeros_in_spoken_time: 2140 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2141 return time_lapsed, spoken_time_separator.join(spoken_time_part) 2142 2143 @staticmethod 2144 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2145 """ 2146 Convert a specific day, month, and year into a timestamp. 2147 2148 Parameters: 2149 day (int): The day of the month. 2150 month (int): The month of the year. Default is 6 (June). 2151 year (int): The year. Default is 2024. 2152 2153 Returns: 2154 int: The timestamp representing the given day, month, and year. 2155 2156 Note: 2157 This method assumes the default month and year if not provided. 2158 """ 2159 return ZakatTracker.time(datetime.datetime(year, month, day)) 2160 2161 @staticmethod 2162 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2163 """ 2164 Generate a random date between two given dates. 2165 2166 Parameters: 2167 start_date (datetime.datetime): The start date from which to generate a random date. 2168 end_date (datetime.datetime): The end date until which to generate a random date. 2169 2170 Returns: 2171 datetime.datetime: A random date between the start_date and end_date. 2172 """ 2173 time_between_dates = end_date - start_date 2174 days_between_dates = time_between_dates.days 2175 random_number_of_days = random.randrange(days_between_dates) 2176 return start_date + datetime.timedelta(days=random_number_of_days) 2177 2178 @staticmethod 2179 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2180 debug: bool = False) -> int: 2181 """ 2182 Generate a random CSV file with specified parameters. 2183 2184 Parameters: 2185 path (str): The path where the CSV file will be saved. Default is "data.csv". 2186 count (int): The number of rows to generate in the CSV file. Default is 1000. 2187 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2188 debug (bool): A flag indicating whether to print debug information. 2189 2190 Returns: 2191 None. The function generates a CSV file at the specified path with the given count of rows. 2192 Each row contains a randomly generated account, description, value, and date. 2193 The value is randomly generated between 1000 and 100000, 2194 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2195 If the row number is not divisible by 13, the value is multiplied by -1. 2196 """ 2197 if debug: 2198 print('generate_random_csv_file', f'debug={debug}') 2199 i = 0 2200 with open(path, "w", newline="") as csvfile: 2201 writer = csv.writer(csvfile) 2202 for i in range(count): 2203 account = f"acc-{random.randint(1, 1000)}" 2204 desc = f"Some text {random.randint(1, 1000)}" 2205 value = random.randint(1000, 100000) 2206 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2207 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2208 if not i % 13 == 0: 2209 value *= -1 2210 row = [account, desc, value, date] 2211 if with_rate: 2212 rate = random.randint(1, 100) * 0.12 2213 if debug: 2214 print('before-append', row) 2215 row.append(rate) 2216 if debug: 2217 print('after-append', row) 2218 writer.writerow(row) 2219 i = i + 1 2220 return i 2221 2222 @staticmethod 2223 def create_random_list(max_sum, min_value=0, max_value=10): 2224 """ 2225 Creates a list of random integers whose sum does not exceed the specified maximum. 2226 2227 Args: 2228 max_sum: The maximum allowed sum of the list elements. 2229 min_value: The minimum possible value for an element (inclusive). 2230 max_value: The maximum possible value for an element (inclusive). 2231 2232 Returns: 2233 A list of random integers. 2234 """ 2235 result = [] 2236 current_sum = 0 2237 2238 while current_sum < max_sum: 2239 # Calculate the remaining space for the next element 2240 remaining_sum = max_sum - current_sum 2241 # Determine the maximum possible value for the next element 2242 next_max_value = min(remaining_sum, max_value) 2243 # Generate a random element within the allowed range 2244 next_element = random.randint(min_value, next_max_value) 2245 result.append(next_element) 2246 current_sum += next_element 2247 2248 return result 2249 2250 def _test_core(self, restore=False, debug=False): 2251 2252 if debug: 2253 random.seed(1234567890) 2254 2255 # sanity check - random forward time 2256 2257 xlist = [] 2258 limit = 1000 2259 for _ in range(limit): 2260 y = ZakatTracker.time() 2261 z = '-' 2262 if y not in xlist: 2263 xlist.append(y) 2264 else: 2265 z = 'x' 2266 if debug: 2267 print(z, y) 2268 xx = len(xlist) 2269 if debug: 2270 print('count', xx, ' - unique: ', (xx / limit) * 100, '%') 2271 assert limit == xx 2272 2273 # sanity check - convert date since 1000AD 2274 2275 for year in range(1000, 9000): 2276 ns = ZakatTracker.time(datetime.datetime.strptime(f"{year}-12-30 18:30:45", "%Y-%m-%d %H:%M:%S")) 2277 date = ZakatTracker.time_to_datetime(ns) 2278 if debug: 2279 print(date) 2280 assert date.year == year 2281 assert date.month == 12 2282 assert date.day == 30 2283 assert date.hour == 18 2284 assert date.minute == 30 2285 assert date.second in [44, 45] 2286 2287 # human_readable_size 2288 2289 assert ZakatTracker.human_readable_size(0) == "0.00 B" 2290 assert ZakatTracker.human_readable_size(512) == "512.00 B" 2291 assert ZakatTracker.human_readable_size(1023) == "1023.00 B" 2292 2293 assert ZakatTracker.human_readable_size(1024) == "1.00 KB" 2294 assert ZakatTracker.human_readable_size(2048) == "2.00 KB" 2295 assert ZakatTracker.human_readable_size(5120) == "5.00 KB" 2296 2297 assert ZakatTracker.human_readable_size(1024 ** 2) == "1.00 MB" 2298 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2) == "2.50 MB" 2299 2300 assert ZakatTracker.human_readable_size(1024 ** 3) == "1.00 GB" 2301 assert ZakatTracker.human_readable_size(1024 ** 4) == "1.00 TB" 2302 assert ZakatTracker.human_readable_size(1024 ** 5) == "1.00 PB" 2303 2304 assert ZakatTracker.human_readable_size(1536, decimal_places=0) == "2 KB" 2305 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2, decimal_places=1) == "2.5 MB" 2306 assert ZakatTracker.human_readable_size(1234567890, decimal_places=3) == "1.150 GB" 2307 2308 try: 2309 # noinspection PyTypeChecker 2310 ZakatTracker.human_readable_size("not a number") 2311 assert False, "Expected TypeError for invalid input" 2312 except TypeError: 2313 pass 2314 2315 try: 2316 # noinspection PyTypeChecker 2317 ZakatTracker.human_readable_size(1024, decimal_places="not an int") 2318 assert False, "Expected TypeError for invalid decimal_places" 2319 except TypeError: 2320 pass 2321 2322 # get_dict_size 2323 assert ZakatTracker.get_dict_size({}) == sys.getsizeof({}), "Empty dictionary size mismatch" 2324 assert ZakatTracker.get_dict_size({"a": 1, "b": 2.5, "c": True}) != sys.getsizeof({}), "Not Empty dictionary" 2325 2326 # number scale 2327 error = 0 2328 total = 0 2329 for sign in ['', '-']: 2330 for max_i, max_j, decimal_places in [ 2331 (101, 101, 2), # fiat currency minimum unit took 2 decimal places 2332 (1, 1_000, 8), # cryptocurrency like Satoshi in Bitcoin took 8 decimal places 2333 (1, 1_000, 18) # cryptocurrency like Wei in Ethereum took 18 decimal places 2334 ]: 2335 for return_type in ( 2336 float, 2337 Decimal, 2338 ): 2339 for i in range(max_i): 2340 for j in range(max_j): 2341 total += 1 2342 num_str = f'{sign}{i}.{j:0{decimal_places}d}' 2343 num = return_type(num_str) 2344 scaled = self.scale(num, decimal_places=decimal_places) 2345 unscaled = self.unscale(scaled, return_type=return_type, decimal_places=decimal_places) 2346 if debug: 2347 print( 2348 f'return_type: {return_type}, num_str: {num_str} - num: {num} - scaled: {scaled} - unscaled: {unscaled}') 2349 if unscaled != num: 2350 if debug: 2351 print('***** SCALE ERROR *****') 2352 error += 1 2353 if debug: 2354 print(f'total: {total}, error({error}): {100 * error / total}%') 2355 assert error == 0 2356 2357 assert self.nolock() 2358 assert self._history() is True 2359 2360 table = { 2361 1: [ 2362 (0, 10, 1000, 1000, 1000, 1, 1), 2363 (0, 20, 3000, 3000, 3000, 2, 2), 2364 (0, 30, 6000, 6000, 6000, 3, 3), 2365 (1, 15, 4500, 4500, 4500, 3, 4), 2366 (1, 50, -500, -500, -500, 4, 5), 2367 (1, 100, -10500, -10500, -10500, 5, 6), 2368 ], 2369 'wallet': [ 2370 (1, 90, -9000, -9000, -9000, 1, 1), 2371 (0, 100, 1000, 1000, 1000, 2, 2), 2372 (1, 190, -18000, -18000, -18000, 3, 3), 2373 (0, 1000, 82000, 82000, 82000, 4, 4), 2374 ], 2375 } 2376 for x in table: 2377 for y in table[x]: 2378 self.lock() 2379 if y[0] == 0: 2380 ref = self.track( 2381 unscaled_value=y[1], 2382 desc='test-add', 2383 account=x, 2384 logging=True, 2385 created=ZakatTracker.time(), 2386 debug=debug, 2387 ) 2388 else: 2389 (ref, z) = self.sub( 2390 unscaled_value=y[1], 2391 desc='test-sub', 2392 account=x, 2393 created=ZakatTracker.time(), 2394 ) 2395 if debug: 2396 print('_sub', z, ZakatTracker.time()) 2397 assert ref != 0 2398 assert len(self._vault['account'][x]['log'][ref]['file']) == 0 2399 for i in range(3): 2400 file_ref = self.add_file(x, ref, 'file_' + str(i)) 2401 sleep(0.0000001) 2402 assert file_ref != 0 2403 if debug: 2404 print('ref', ref, 'file', file_ref) 2405 assert len(self._vault['account'][x]['log'][ref]['file']) == i + 1 2406 file_ref = self.add_file(x, ref, 'file_' + str(3)) 2407 assert self.remove_file(x, ref, file_ref) 2408 z = self.balance(x) 2409 if debug: 2410 print("debug-0", z, y) 2411 assert z == y[2] 2412 z = self.balance(x, False) 2413 if debug: 2414 print("debug-1", z, y[3]) 2415 assert z == y[3] 2416 o = self._vault['account'][x]['log'] 2417 z = 0 2418 for i in o: 2419 z += o[i]['value'] 2420 if debug: 2421 print("debug-2", z, type(z)) 2422 print("debug-2", y[4], type(y[4])) 2423 assert z == y[4] 2424 if debug: 2425 print('debug-2 - PASSED') 2426 assert self.box_size(x) == y[5] 2427 assert self.log_size(x) == y[6] 2428 assert not self.nolock() 2429 self.free(self.lock()) 2430 assert self.nolock() 2431 assert self.boxes(x) != {} 2432 assert self.logs(x) != {} 2433 2434 assert not self.hide(x) 2435 assert self.hide(x, False) is False 2436 assert self.hide(x) is False 2437 assert self.hide(x, True) 2438 assert self.hide(x) 2439 2440 assert self.zakatable(x) 2441 assert self.zakatable(x, False) is False 2442 assert self.zakatable(x) is False 2443 assert self.zakatable(x, True) 2444 assert self.zakatable(x) 2445 2446 if restore is True: 2447 count = len(self._vault['history']) 2448 if debug: 2449 print('history-count', count) 2450 assert count == 10 2451 # try mode 2452 for _ in range(count): 2453 assert self.recall(True, debug) 2454 count = len(self._vault['history']) 2455 if debug: 2456 print('history-count', count) 2457 assert count == 10 2458 _accounts = list(table.keys()) 2459 accounts_limit = len(_accounts) + 1 2460 for i in range(-1, -accounts_limit, -1): 2461 account = _accounts[i] 2462 if debug: 2463 print(account, len(table[account])) 2464 transaction_limit = len(table[account]) + 1 2465 for j in range(-1, -transaction_limit, -1): 2466 row = table[account][j] 2467 if debug: 2468 print(row, self.balance(account), self.balance(account, False)) 2469 assert self.balance(account) == self.balance(account, False) 2470 assert self.balance(account) == row[2] 2471 assert self.recall(False, debug) 2472 assert self.recall(False, debug) is False 2473 count = len(self._vault['history']) 2474 if debug: 2475 print('history-count', count) 2476 assert count == 0 2477 self.reset() 2478 2479 def test(self, debug: bool = False) -> bool: 2480 if debug: 2481 print('test', f'debug={debug}') 2482 try: 2483 2484 self._test_core(True, debug) 2485 self._test_core(False, debug) 2486 2487 assert self._history() 2488 2489 # Not allowed for duplicate transactions in the same account and time 2490 2491 created = ZakatTracker.time() 2492 self.track(100, 'test-1', 'same', True, created) 2493 failed = False 2494 try: 2495 self.track(50, 'test-1', 'same', True, created) 2496 except: 2497 failed = True 2498 assert failed is True 2499 2500 self.reset() 2501 2502 # Same account transfer 2503 for x in [1, 'a', True, 1.8, None]: 2504 failed = False 2505 try: 2506 self.transfer(1, x, x, 'same-account', debug=debug) 2507 except: 2508 failed = True 2509 assert failed is True 2510 2511 # Always preserve box age during transfer 2512 2513 series: list[tuple] = [ 2514 (30, 4), 2515 (60, 3), 2516 (90, 2), 2517 ] 2518 case = { 2519 3000: { 2520 'series': series, 2521 'rest': 15000, 2522 }, 2523 6000: { 2524 'series': series, 2525 'rest': 12000, 2526 }, 2527 9000: { 2528 'series': series, 2529 'rest': 9000, 2530 }, 2531 18000: { 2532 'series': series, 2533 'rest': 0, 2534 }, 2535 27000: { 2536 'series': series, 2537 'rest': -9000, 2538 }, 2539 36000: { 2540 'series': series, 2541 'rest': -18000, 2542 }, 2543 } 2544 2545 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2546 2547 for total in case: 2548 if debug: 2549 print('--------------------------------------------------------') 2550 print(f'case[{total}]', case[total]) 2551 for x in case[total]['series']: 2552 self.track( 2553 unscaled_value=x[0], 2554 desc=f"test-{x} ages", 2555 account='ages', 2556 logging=True, 2557 created=selected_time * x[1], 2558 ) 2559 2560 unscaled_total = self.unscale(total) 2561 if debug: 2562 print('unscaled_total', unscaled_total) 2563 refs = self.transfer( 2564 unscaled_amount=unscaled_total, 2565 from_account='ages', 2566 to_account='future', 2567 desc='Zakat Movement', 2568 debug=debug, 2569 ) 2570 2571 if debug: 2572 print('refs', refs) 2573 2574 ages_cache_balance = self.balance('ages') 2575 ages_fresh_balance = self.balance('ages', False) 2576 rest = case[total]['rest'] 2577 if debug: 2578 print('source', ages_cache_balance, ages_fresh_balance, rest) 2579 assert ages_cache_balance == rest 2580 assert ages_fresh_balance == rest 2581 2582 future_cache_balance = self.balance('future') 2583 future_fresh_balance = self.balance('future', False) 2584 if debug: 2585 print('target', future_cache_balance, future_fresh_balance, total) 2586 print('refs', refs) 2587 assert future_cache_balance == total 2588 assert future_fresh_balance == total 2589 2590 # TODO: check boxes times for `ages` should equal box times in `future` 2591 for ref in self._vault['account']['ages']['box']: 2592 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2593 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2594 future_capital = 0 2595 if ref in self._vault['account']['future']['box']: 2596 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2597 future_rest = 0 2598 if ref in self._vault['account']['future']['box']: 2599 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2600 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2601 if debug: 2602 print('================================================================') 2603 print('ages', ages_capital, ages_rest) 2604 print('future', future_capital, future_rest) 2605 if ages_rest == 0: 2606 assert ages_capital == future_capital 2607 elif ages_rest < 0: 2608 assert -ages_capital == future_capital 2609 elif ages_rest > 0: 2610 assert ages_capital == ages_rest + future_capital 2611 self.reset() 2612 assert len(self._vault['history']) == 0 2613 2614 assert self._history() 2615 assert self._history(False) is False 2616 assert self._history() is False 2617 assert self._history(True) 2618 assert self._history() 2619 if debug: 2620 print('####################################################################') 2621 2622 transaction = [ 2623 ( 2624 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2625 2000, 2000, 2000, 1, 1, 2626 ), 2627 ( 2628 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2629 75000, 75000, 75000, 1, 1, 2630 ), 2631 ( 2632 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2633 60000, 60000, 60000, 1, 1, 2634 ), 2635 ] 2636 for z in transaction: 2637 self.lock() 2638 x = z[1] 2639 y = z[2] 2640 self.transfer( 2641 unscaled_amount=z[0], 2642 from_account=x, 2643 to_account=y, 2644 desc='test-transfer', 2645 debug=debug, 2646 ) 2647 zz = self.balance(x) 2648 if debug: 2649 print(zz, z) 2650 assert zz == z[3] 2651 xx = self.accounts()[x] 2652 assert xx == z[3] 2653 assert self.balance(x, False) == z[4] 2654 assert xx == z[4] 2655 2656 s = 0 2657 log = self._vault['account'][x]['log'] 2658 for i in log: 2659 s += log[i]['value'] 2660 if debug: 2661 print('s', s, 'z[5]', z[5]) 2662 assert s == z[5] 2663 2664 assert self.box_size(x) == z[6] 2665 assert self.log_size(x) == z[7] 2666 2667 yy = self.accounts()[y] 2668 assert self.balance(y) == z[8] 2669 assert yy == z[8] 2670 assert self.balance(y, False) == z[9] 2671 assert yy == z[9] 2672 2673 s = 0 2674 log = self._vault['account'][y]['log'] 2675 for i in log: 2676 s += log[i]['value'] 2677 assert s == z[10] 2678 2679 assert self.box_size(y) == z[11] 2680 assert self.log_size(y) == z[12] 2681 assert self.free(self.lock()) 2682 2683 if debug: 2684 pp().pprint(self.check(2.17)) 2685 2686 assert not self.nolock() 2687 history_count = len(self._vault['history']) 2688 if debug: 2689 print('history-count', history_count) 2690 assert history_count == 4 2691 assert not self.free(ZakatTracker.time()) 2692 assert self.free(self.lock()) 2693 assert self.nolock() 2694 assert len(self._vault['history']) == 3 2695 2696 # storage 2697 2698 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 2699 if os.path.exists(_path): 2700 os.remove(_path) 2701 self.save() 2702 assert os.path.getsize(_path) > 0 2703 self.reset() 2704 assert self.recall(False, debug) is False 2705 self.load() 2706 assert self._vault['account'] is not None 2707 2708 # recall 2709 2710 assert self.nolock() 2711 assert len(self._vault['history']) == 3 2712 assert self.recall(False, debug) is True 2713 assert len(self._vault['history']) == 2 2714 assert self.recall(False, debug) is True 2715 assert len(self._vault['history']) == 1 2716 assert self.recall(False, debug) is True 2717 assert len(self._vault['history']) == 0 2718 assert self.recall(False, debug) is False 2719 assert len(self._vault['history']) == 0 2720 2721 # exchange 2722 2723 self.exchange("cash", 25, 3.75, "2024-06-25") 2724 self.exchange("cash", 22, 3.73, "2024-06-22") 2725 self.exchange("cash", 15, 3.69, "2024-06-15") 2726 self.exchange("cash", 10, 3.66) 2727 2728 for i in range(1, 30): 2729 exchange = self.exchange("cash", i) 2730 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2731 if debug: 2732 print(i, rate, description, created) 2733 assert created 2734 if i < 10: 2735 assert rate == 1 2736 assert description is None 2737 elif i == 10: 2738 assert rate == 3.66 2739 assert description is None 2740 elif i < 15: 2741 assert rate == 3.66 2742 assert description is None 2743 elif i == 15: 2744 assert rate == 3.69 2745 assert description is not None 2746 elif i < 22: 2747 assert rate == 3.69 2748 assert description is not None 2749 elif i == 22: 2750 assert rate == 3.73 2751 assert description is not None 2752 elif i >= 25: 2753 assert rate == 3.75 2754 assert description is not None 2755 exchange = self.exchange("bank", i) 2756 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2757 if debug: 2758 print(i, rate, description, created) 2759 assert created 2760 assert rate == 1 2761 assert description is None 2762 2763 assert len(self._vault['exchange']) > 0 2764 assert len(self.exchanges()) > 0 2765 self._vault['exchange'].clear() 2766 assert len(self._vault['exchange']) == 0 2767 assert len(self.exchanges()) == 0 2768 2769 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2770 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2771 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2772 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2773 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2774 2775 for i in [x * 0.12 for x in range(-15, 21)]: 2776 if i <= 0: 2777 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2778 else: 2779 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2780 2781 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2782 for i in range(1, 31): 2783 timestamp_ns = ZakatTracker.day_to_time(i) 2784 exchange = self.exchange("cash", timestamp_ns) 2785 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2786 if debug: 2787 print(i, rate, description, created) 2788 assert created 2789 if i < 10: 2790 assert rate == 1 2791 assert description is None 2792 elif i == 10: 2793 assert rate == 3.66 2794 assert description is None 2795 elif i < 15: 2796 assert rate == 3.66 2797 assert description is None 2798 elif i == 15: 2799 assert rate == 3.69 2800 assert description is not None 2801 elif i < 22: 2802 assert rate == 3.69 2803 assert description is not None 2804 elif i == 22: 2805 assert rate == 3.73 2806 assert description is not None 2807 elif i >= 25: 2808 assert rate == 3.75 2809 assert description is not None 2810 exchange = self.exchange("bank", i) 2811 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2812 if debug: 2813 print(i, rate, description, created) 2814 assert created 2815 assert rate == 1 2816 assert description is None 2817 2818 # csv 2819 2820 csv_count = 1000 2821 2822 for with_rate, path in { 2823 False: 'test-import_csv-no-exchange', 2824 True: 'test-import_csv-with-exchange', 2825 }.items(): 2826 2827 if debug: 2828 print('test_import_csv', with_rate, path) 2829 2830 csv_path = path + '.csv' 2831 if os.path.exists(csv_path): 2832 os.remove(csv_path) 2833 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2834 if debug: 2835 print('generate_random_csv_file', c) 2836 assert c == csv_count 2837 assert os.path.getsize(csv_path) > 0 2838 cache_path = self.import_csv_cache_path() 2839 if os.path.exists(cache_path): 2840 os.remove(cache_path) 2841 self.reset() 2842 (created, found, bad) = self.import_csv(csv_path, debug) 2843 bad_count = len(bad) 2844 assert bad_count > 0 2845 if debug: 2846 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2847 print('bad', bad) 2848 tmp_size = os.path.getsize(cache_path) 2849 assert tmp_size > 0 2850 # TODO: assert created + found + bad_count == csv_count 2851 # TODO: assert created == csv_count 2852 # TODO: assert bad_count == 0 2853 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2854 bad_2_count = len(bad_2) 2855 if debug: 2856 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2857 print('bad', bad) 2858 assert bad_2_count > 0 2859 # TODO: assert tmp_size == os.path.getsize(cache_path) 2860 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2861 # TODO: assert created == found_2 2862 # TODO: assert bad_count == bad_2_count 2863 # TODO: assert found_2 == csv_count 2864 # TODO: assert bad_2_count == 0 2865 # TODO: assert created_2 == 0 2866 2867 # payment parts 2868 2869 positive_parts = self.build_payment_parts(100, positive_only=True) 2870 assert self.check_payment_parts(positive_parts) != 0 2871 assert self.check_payment_parts(positive_parts) != 0 2872 all_parts = self.build_payment_parts(300, positive_only=False) 2873 assert self.check_payment_parts(all_parts) != 0 2874 assert self.check_payment_parts(all_parts) != 0 2875 if debug: 2876 pp().pprint(positive_parts) 2877 pp().pprint(all_parts) 2878 # dynamic discount 2879 suite = [] 2880 count = 3 2881 for exceed in [False, True]: 2882 case = [] 2883 for parts in [positive_parts, all_parts]: 2884 part = parts.copy() 2885 demand = part['demand'] 2886 if debug: 2887 print(demand, part['total']) 2888 i = 0 2889 z = demand / count 2890 cp = { 2891 'account': {}, 2892 'demand': demand, 2893 'exceed': exceed, 2894 'total': part['total'], 2895 } 2896 j = '' 2897 for x, y in part['account'].items(): 2898 x_exchange = self.exchange(x) 2899 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2900 if exceed and zz <= demand: 2901 i += 1 2902 y['part'] = zz 2903 if debug: 2904 print(exceed, y) 2905 cp['account'][x] = y 2906 case.append(y) 2907 elif not exceed and y['balance'] >= zz: 2908 i += 1 2909 y['part'] = zz 2910 if debug: 2911 print(exceed, y) 2912 cp['account'][x] = y 2913 case.append(y) 2914 j = x 2915 if i >= count: 2916 break 2917 if len(cp['account'][j]) > 0: 2918 suite.append(cp) 2919 if debug: 2920 print('suite', len(suite)) 2921 # vault = self._vault.copy() 2922 for case in suite: 2923 # self._vault = vault.copy() 2924 if debug: 2925 print('case', case) 2926 result = self.check_payment_parts(case) 2927 if debug: 2928 print('check_payment_parts', result, f'exceed: {exceed}') 2929 assert result == 0 2930 2931 report = self.check(2.17, None, debug) 2932 (valid, brief, plan) = report 2933 if debug: 2934 print('valid', valid) 2935 zakat_result = self.zakat(report, parts=case, debug=debug) 2936 if debug: 2937 print('zakat-result', zakat_result) 2938 assert valid == zakat_result 2939 2940 assert self.save(path + f'.{self.ext()}') 2941 assert self.export_json(path + '.json') 2942 2943 assert self.export_json("1000-transactions-test.json") 2944 assert self.save(f"1000-transactions-test.{self.ext()}") 2945 2946 self.reset() 2947 2948 # test transfer between accounts with different exchange rate 2949 2950 a_SAR = "Bank (SAR)" 2951 b_USD = "Bank (USD)" 2952 c_SAR = "Safe (SAR)" 2953 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2954 for case in [ 2955 (0, a_SAR, "SAR Gift", 1000, 100000), 2956 (1, a_SAR, 1), 2957 (0, b_USD, "USD Gift", 500, 50000), 2958 (1, b_USD, 1), 2959 (2, b_USD, 3.75), 2960 (1, b_USD, 3.75), 2961 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2962 (0, c_SAR, "Salary", 750, 75000), 2963 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2964 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2965 ]: 2966 if debug: 2967 print('case', case) 2968 match (case[0]): 2969 case 0: # track 2970 _, account, desc, x, balance = case 2971 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2972 2973 cached_value = self.balance(account, cached=True) 2974 fresh_value = self.balance(account, cached=False) 2975 if debug: 2976 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2977 assert cached_value == balance 2978 assert fresh_value == balance 2979 case 1: # check-exchange 2980 _, account, expected_rate = case 2981 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2982 if debug: 2983 print('t-exchange', t_exchange) 2984 assert t_exchange['rate'] == expected_rate 2985 case 2: # do-exchange 2986 _, account, rate = case 2987 self.exchange(account, rate=rate, debug=debug) 2988 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2989 if debug: 2990 print('b-exchange', b_exchange) 2991 assert b_exchange['rate'] == rate 2992 case 3: # transfer 2993 _, x, a, b, desc, a_balance, b_balance = case 2994 self.transfer(x, a, b, desc, debug=debug) 2995 2996 cached_value = self.balance(a, cached=True) 2997 fresh_value = self.balance(a, cached=False) 2998 if debug: 2999 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 3000 assert cached_value == a_balance 3001 assert fresh_value == a_balance 3002 3003 cached_value = self.balance(b, cached=True) 3004 fresh_value = self.balance(b, cached=False) 3005 if debug: 3006 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3007 assert cached_value == b_balance 3008 assert fresh_value == b_balance 3009 3010 # Transfer all in many chunks randomly from B to A 3011 a_SAR_balance = 137125 3012 b_USD_balance = 50100 3013 b_USD_exchange = self.exchange(b_USD) 3014 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3015 if debug: 3016 print('amounts', amounts) 3017 i = 0 3018 for x in amounts: 3019 if debug: 3020 print(f'{i} - transfer-with-exchange({x})') 3021 self.transfer( 3022 unscaled_amount=self.unscale(x), 3023 from_account=b_USD, 3024 to_account=a_SAR, 3025 desc=f"{x} USD -> SAR", 3026 debug=debug, 3027 ) 3028 3029 b_USD_balance -= x 3030 cached_value = self.balance(b_USD, cached=True) 3031 fresh_value = self.balance(b_USD, cached=False) 3032 if debug: 3033 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3034 b_USD_balance) 3035 assert cached_value == b_USD_balance 3036 assert fresh_value == b_USD_balance 3037 3038 a_SAR_balance += int(x * b_USD_exchange['rate']) 3039 cached_value = self.balance(a_SAR, cached=True) 3040 fresh_value = self.balance(a_SAR, cached=False) 3041 if debug: 3042 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3043 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3044 assert cached_value == a_SAR_balance 3045 assert fresh_value == a_SAR_balance 3046 i += 1 3047 3048 # Transfer all in many chunks randomly from C to A 3049 c_SAR_balance = 37500 3050 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3051 if debug: 3052 print('amounts', amounts) 3053 i = 0 3054 for x in amounts: 3055 if debug: 3056 print(f'{i} - transfer-with-exchange({x})') 3057 self.transfer( 3058 unscaled_amount=self.unscale(x), 3059 from_account=c_SAR, 3060 to_account=a_SAR, 3061 desc=f"{x} SAR -> a_SAR", 3062 debug=debug, 3063 ) 3064 3065 c_SAR_balance -= x 3066 cached_value = self.balance(c_SAR, cached=True) 3067 fresh_value = self.balance(c_SAR, cached=False) 3068 if debug: 3069 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3070 c_SAR_balance) 3071 assert cached_value == c_SAR_balance 3072 assert fresh_value == c_SAR_balance 3073 3074 a_SAR_balance += x 3075 cached_value = self.balance(a_SAR, cached=True) 3076 fresh_value = self.balance(a_SAR, cached=False) 3077 if debug: 3078 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3079 a_SAR_balance) 3080 assert cached_value == a_SAR_balance 3081 assert fresh_value == a_SAR_balance 3082 i += 1 3083 3084 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3085 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3086 3087 # check & zakat with exchange rates for many cycles 3088 3089 for rate, values in { 3090 1: { 3091 'in': [1000, 2000, 10000], 3092 'exchanged': [100000, 200000, 1000000], 3093 'out': [2500, 5000, 73140], 3094 }, 3095 3.75: { 3096 'in': [200, 1000, 5000], 3097 'exchanged': [75000, 375000, 1875000], 3098 'out': [1875, 9375, 137138], 3099 }, 3100 }.items(): 3101 a, b, c = values['in'] 3102 m, n, o = values['exchanged'] 3103 x, y, z = values['out'] 3104 if debug: 3105 print('rate', rate, 'values', values) 3106 for case in [ 3107 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3108 {'safe': {0: {'below_nisab': x}}}, 3109 ], False, m), 3110 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3111 {'safe': {0: {'count': 1, 'total': y}}}, 3112 ], True, n), 3113 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3114 {'cave': {0: {'count': 3, 'total': z}}}, 3115 ], True, o), 3116 ]: 3117 if debug: 3118 print(f"############# check(rate: {rate}) #############") 3119 print('case', case) 3120 self.reset() 3121 self.exchange(account=case[1], created=case[2], rate=rate) 3122 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3123 assert self.snapshot() 3124 3125 # assert self.nolock() 3126 # history_size = len(self._vault['history']) 3127 # print('history_size', history_size) 3128 # assert history_size == 2 3129 assert self.lock() 3130 assert not self.nolock() 3131 report = self.check(2.17, None, debug) 3132 (valid, brief, plan) = report 3133 if debug: 3134 print('brief', brief) 3135 assert valid == case[4] 3136 assert case[5] == brief[0] 3137 assert case[5] == brief[1] 3138 3139 if debug: 3140 pp().pprint(plan) 3141 3142 for x in plan: 3143 assert case[1] == x 3144 if 'total' in case[3][0][x][0].keys(): 3145 assert case[3][0][x][0]['total'] == int(brief[2]) 3146 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3147 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3148 else: 3149 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3150 if debug: 3151 pp().pprint(report) 3152 result = self.zakat(report, debug=debug) 3153 if debug: 3154 print('zakat-result', result, case[4]) 3155 assert result == case[4] 3156 report = self.check(2.17, None, debug) 3157 (valid, brief, plan) = report 3158 assert valid is False 3159 3160 history_size = len(self._vault['history']) 3161 if debug: 3162 print('history_size', history_size) 3163 assert history_size == 3 3164 assert not self.nolock() 3165 assert self.recall(False, debug) is False 3166 self.free(self.lock()) 3167 assert self.nolock() 3168 3169 for i in range(3, 0, -1): 3170 history_size = len(self._vault['history']) 3171 if debug: 3172 print('history_size', history_size) 3173 assert history_size == i 3174 assert self.recall(False, debug) is True 3175 3176 assert self.nolock() 3177 assert self.recall(False, debug) is False 3178 3179 history_size = len(self._vault['history']) 3180 if debug: 3181 print('history_size', history_size) 3182 assert history_size == 0 3183 3184 account_size = len(self._vault['account']) 3185 if debug: 3186 print('account_size', account_size) 3187 assert account_size == 0 3188 3189 report_size = len(self._vault['report']) 3190 if debug: 3191 print('report_size', report_size) 3192 assert report_size == 0 3193 3194 assert self.nolock() 3195 return True 3196 except Exception as e: 3197 # pp().pprint(self._vault) 3198 assert self.export_json("test-snapshot.json") 3199 assert self.save(f"test-snapshot.{self.ext()}") 3200 raise e
A class for tracking and calculating Zakat.
This class provides functionalities for recording transactions, calculating Zakat due, and managing account balances. It also offers features like importing transactions from CSV files, exporting data to JSON format, and saving/loading the tracker state.
The ZakatTracker
class is designed to handle both positive and negative transactions,
allowing for flexible tracking of financial activities related to Zakat. It also supports
the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due
based on the current silver price.
The class uses a camel file as its database to persist the tracker state, ensuring data integrity across sessions. It also provides options for enabling or disabling history tracking, allowing users to choose their preferred level of detail.
In addition, the ZakatTracker
class includes various helper methods like
time
, time_to_datetime
, lock
, free
, recall
, export_json
,
and more. These methods provide additional functionalities and flexibility
for interacting with and managing the Zakat tracker.
Attributes: ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. ZakatTracker.Version (function): The version of the ZakatTracker class.
Data Structure: The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data.
_vault (dict):
- account (dict):
- {account_number} (dict):
- balance (int): The current balance of the account.
- box (dict): A dictionary storing transaction details.
- {timestamp} (dict):
- capital (int): The initial amount of the transaction.
- count (int): The number of times Zakat has been calculated for this transaction.
- last (int): The timestamp of the last Zakat calculation.
- rest (int): The remaining amount after Zakat deductions and withdrawal.
- total (int): The total Zakat deducted from this transaction.
- count (int): The total number of transactions for the account.
- log (dict): A dictionary storing transaction logs.
- {timestamp} (dict):
- value (int): The transaction amount (positive or negative).
- desc (str): The description of the transaction.
- ref (int): The box reference (positive or None).
- file (dict): A dictionary storing file references associated with the transaction.
- hide (bool): Indicates whether the account is hidden or not.
- zakatable (bool): Indicates whether the account is subject to Zakat.
- exchange (dict):
- account (dict):
- {timestamps} (dict):
- rate (float): Exchange rate when compared to local currency.
- description (str): The description of the exchange rate.
- history (dict):
- {timestamp} (list): A list of dictionaries storing the history of actions performed.
- {action_dict} (dict):
- action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT).
- account (str): The account number associated with the action.
- ref (int): The reference number of the transaction.
- file (int): The reference number of the file (if applicable).
- key (str): The key associated with the action (e.g., 'rest', 'total').
- value (int): The value associated with the action.
- math (MathOperation): The mathematical operation performed (if applicable).
- lock (int or None): The timestamp indicating the current lock status (None if not locked).
- report (dict):
- {timestamp} (tuple): A tuple storing Zakat report details.
270 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 271 """ 272 Initialize ZakatTracker with database path and history mode. 273 274 Parameters: 275 db_path (str): The path to the database file. Default is "zakat.camel". 276 history_mode (bool): The mode for tracking history. Default is True. 277 278 Returns: 279 None 280 """ 281 self._base_path = None 282 self._vault_path = None 283 self._vault = None 284 self.reset() 285 self._history(history_mode) 286 self.path(db_path)
Initialize ZakatTracker with database path and history mode.
Parameters: db_path (str): The path to the database file. Default is "zakat.camel". history_mode (bool): The mode for tracking history. Default is True.
Returns: None
194 @staticmethod 195 def Version() -> str: 196 """ 197 Returns the current version of the software. 198 199 This function returns a string representing the current version of the software, 200 including major, minor, and patch version numbers in the format "X.Y.Z". 201 202 Returns: 203 str: The current version of the software. 204 """ 205 return '0.2.86'
Returns the current version of the software.
This function returns a string representing the current version of the software, including major, minor, and patch version numbers in the format "X.Y.Z".
Returns: str: The current version of the software.
207 @staticmethod 208 def ZakatCut(x: float) -> float: 209 """ 210 Calculates the Zakat amount due on an asset. 211 212 This function calculates the zakat amount due on a given asset value over one lunar year. 213 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 214 that exceeds a certain threshold (Nisab). 215 216 Parameters: 217 x: The total value of the asset on which Zakat is to be calculated. 218 219 Returns: 220 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 221 """ 222 return 0.025 * x # Zakat Cut in one Lunar Year
Calculates the Zakat amount due on an asset.
This function calculates the zakat amount due on a given asset value over one lunar year. Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth that exceeds a certain threshold (Nisab).
Parameters: x: The total value of the asset on which Zakat is to be calculated.
Returns: The amount of Zakat due on the asset, calculated as 2.5% of the asset's value.
224 @staticmethod 225 def TimeCycle(days: int = 355) -> int: 226 """ 227 Calculates the approximate duration of a lunar year in nanoseconds. 228 229 This function calculates the approximate duration of a lunar year based on the given number of days. 230 It converts the given number of days into nanoseconds for use in high-precision timing applications. 231 232 Parameters: 233 days: The number of days in a lunar year. Defaults to 355, 234 which is an approximation of the average length of a lunar year. 235 236 Returns: 237 The approximate duration of a lunar year in nanoseconds. 238 """ 239 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds
Calculates the approximate duration of a lunar year in nanoseconds.
This function calculates the approximate duration of a lunar year based on the given number of days. It converts the given number of days into nanoseconds for use in high-precision timing applications.
Parameters: days: The number of days in a lunar year. Defaults to 355, which is an approximation of the average length of a lunar year.
Returns: The approximate duration of a lunar year in nanoseconds.
241 @staticmethod 242 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 243 """ 244 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 245 246 This function calculates the Nisab value, which is the minimum threshold of wealth, 247 that makes an individual liable for paying Zakat. 248 The Nisab value is determined by the equivalent value of a specific amount 249 of gold or silver (currently 595 grams in silver) in the local currency. 250 251 Parameters: 252 - gram_price (float): The price per gram of Nisab. 253 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 254 255 Returns: 256 - float: The total value of Nisab based on the given price per gram. 257 """ 258 return gram_price * gram_quantity
Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram.
This function calculates the Nisab value, which is the minimum threshold of wealth, that makes an individual liable for paying Zakat. The Nisab value is determined by the equivalent value of a specific amount of gold or silver (currently 595 grams in silver) in the local currency.
Parameters:
- gram_price (float): The price per gram of Nisab.
- gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver.
Returns:
- float: The total value of Nisab based on the given price per gram.
260 @staticmethod 261 def ext() -> str: 262 """ 263 Returns the file extension used by the ZakatTracker class. 264 265 Returns: 266 str: The file extension used by the ZakatTracker class, which is 'camel'. 267 """ 268 return 'camel'
Returns the file extension used by the ZakatTracker class.
Returns: str: The file extension used by the ZakatTracker class, which is 'camel'.
288 def path(self, path: str = None) -> str: 289 """ 290 Set or get the path to the database file. 291 292 If no path is provided, the current path is returned. 293 If a path is provided, it is set as the new path. 294 The function also creates the necessary directories if the provided path is a file. 295 296 Parameters: 297 path (str): The new path to the database file. If not provided, the current path is returned. 298 299 Returns: 300 str: The current or new path to the database file. 301 """ 302 if path is None: 303 return self._vault_path 304 self._vault_path = Path(path).resolve() 305 base_path = Path(path).resolve() 306 if base_path.is_file() or base_path.suffix: 307 base_path = base_path.parent 308 base_path.mkdir(parents=True, exist_ok=True) 309 self._base_path = base_path 310 return str(self._vault_path)
Set or get the path to the database file.
If no path is provided, the current path is returned. If a path is provided, it is set as the new path. The function also creates the necessary directories if the provided path is a file.
Parameters: path (str): The new path to the database file. If not provided, the current path is returned.
Returns: str: The current or new path to the database file.
312 def base_path(self, *args) -> str: 313 """ 314 Generate a base path by joining the provided arguments with the existing base path. 315 316 Parameters: 317 *args (str): Variable length argument list of strings to be joined with the base path. 318 319 Returns: 320 str: The generated base path. If no arguments are provided, the existing base path is returned. 321 """ 322 if not args: 323 return str(self._base_path) 324 filtered_args = [] 325 ignored_filename = None 326 for arg in args: 327 if Path(arg).suffix: 328 ignored_filename = arg 329 else: 330 filtered_args.append(arg) 331 base_path = Path(self._base_path) 332 full_path = base_path.joinpath(*filtered_args) 333 full_path.mkdir(parents=True, exist_ok=True) 334 if ignored_filename is not None: 335 return full_path.resolve() / ignored_filename # Join with the ignored filename 336 return str(full_path.resolve())
Generate a base path by joining the provided arguments with the existing base path.
Parameters: *args (str): Variable length argument list of strings to be joined with the base path.
Returns: str: The generated base path. If no arguments are provided, the existing base path is returned.
338 @staticmethod 339 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 340 """ 341 Scales a numerical value by a specified power of 10, returning an integer. 342 343 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 344 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 345 346 Parameters: 347 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 348 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 349 by a factor of 100 (e.g., converts 1.23 to 123). 350 351 Returns: 352 The scaled value, rounded to the nearest integer. 353 354 Raises: 355 TypeError: If the input `x` is not a valid numeric type. 356 357 Examples: 358 >>> ZakatTracker.scale(3.14159) 359 314 360 >>> ZakatTracker.scale(1234, decimal_places=3) 361 1234000 362 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 363 50 364 """ 365 if not isinstance(x, (float, int, Decimal)): 366 raise TypeError("Input 'x' must be a float, int, or Decimal.") 367 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places))
Scales a numerical value by a specified power of 10, returning an integer.
This function is designed to handle various numeric types (float
, int
, or Decimal
) and
facilitate precise scaling operations, particularly useful in financial or scientific calculations.
Parameters: x: The numeric value to scale. Can be a floating-point number, integer, or decimal. decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled by a factor of 100 (e.g., converts 1.23 to 123).
Returns: The scaled value, rounded to the nearest integer.
Raises:
TypeError: If the input x
is not a valid numeric type.
Examples:
>>> ZakatTracker.scale(3.14159)
314
>>> ZakatTracker.scale(1234, decimal_places=3)
1234000
>>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4)
50
369 @staticmethod 370 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 371 """ 372 Unscales an integer by a power of 10. 373 374 Parameters: 375 x: The integer to unscale. 376 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 377 decimal_places: The power of 10 to use. Defaults to 2. 378 379 Returns: 380 The unscaled number, converted to the specified return_type. 381 382 Raises: 383 TypeError: If the return_type is not float or Decimal. 384 """ 385 if return_type not in (float, Decimal): 386 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 387 return round(return_type(x / (10 ** decimal_places)), decimal_places)
Unscales an integer by a power of 10.
Parameters: x: The integer to unscale. return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. decimal_places: The power of 10 to use. Defaults to 2.
Returns: The unscaled number, converted to the specified return_type.
Raises: TypeError: If the return_type is not float or Decimal.
403 def reset(self) -> None: 404 """ 405 Reset the internal data structure to its initial state. 406 407 Parameters: 408 None 409 410 Returns: 411 None 412 """ 413 self._vault = { 414 'account': {}, 415 'exchange': {}, 416 'history': {}, 417 'lock': None, 418 'report': {}, 419 }
Reset the internal data structure to its initial state.
Parameters: None
Returns: None
421 @staticmethod 422 def time(now: datetime = None) -> int: 423 """ 424 Generates a timestamp based on the provided datetime object or the current datetime. 425 426 Parameters: 427 now (datetime, optional): The datetime object to generate the timestamp from. 428 If not provided, the current datetime is used. 429 430 Returns: 431 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 432 before 1970 will return in negative until 1000AD. 433 """ 434 if now is None: 435 now = datetime.datetime.now() 436 ordinal_day = now.toordinal() 437 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 438 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day)
Generates a timestamp based on the provided datetime object or the current datetime.
Parameters: now (datetime, optional): The datetime object to generate the timestamp from. If not provided, the current datetime is used.
Returns: int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), before 1970 will return in negative until 1000AD.
440 @staticmethod 441 def time_to_datetime(ordinal_ns: int) -> datetime: 442 """ 443 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 444 445 Parameters: 446 ordinal_ns (int): The ordinal number of days since 1000-01-01. 447 448 Returns: 449 datetime: The corresponding datetime object. 450 """ 451 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 452 ns_in_day = ordinal_ns % 86_400_000_000_000 453 d = datetime.datetime.fromordinal(ordinal_day) 454 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 455 return datetime.datetime.combine(d, datetime.time()) + t
Converts an ordinal number (number of days since 1000-01-01) to a datetime object.
Parameters: ordinal_ns (int): The ordinal number of days since 1000-01-01.
Returns: datetime: The corresponding datetime object.
457 def clean_history(self, lock: int | None = None) -> int: 458 """ 459 Cleans up the history of actions performed on the ZakatTracker instance. 460 461 Parameters: 462 lock (int, optional): The lock ID is used to clean up the empty history. 463 If not provided, it cleans up the empty history records for all locks. 464 465 Returns: 466 int: The number of locks cleaned up. 467 """ 468 count = 0 469 if lock in self._vault['history']: 470 if len(self._vault['history'][lock]) <= 0: 471 count += 1 472 del self._vault['history'][lock] 473 return count 474 self.free(self.lock()) 475 for lock in self._vault['history']: 476 if len(self._vault['history'][lock]) <= 0: 477 count += 1 478 del self._vault['history'][lock] 479 return count
Cleans up the history of actions performed on the ZakatTracker instance.
Parameters: lock (int, optional): The lock ID is used to clean up the empty history. If not provided, it cleans up the empty history records for all locks.
Returns: int: The number of locks cleaned up.
517 def nolock(self) -> bool: 518 """ 519 Check if the vault lock is currently not set. 520 521 Returns: 522 bool: True if the vault lock is not set, False otherwise. 523 """ 524 return self._vault['lock'] is None
Check if the vault lock is currently not set.
Returns: bool: True if the vault lock is not set, False otherwise.
526 def lock(self) -> int: 527 """ 528 Acquires a lock on the ZakatTracker instance. 529 530 Returns: 531 int: The lock ID. This ID can be used to release the lock later. 532 """ 533 return self._step()
Acquires a lock on the ZakatTracker instance.
Returns: int: The lock ID. This ID can be used to release the lock later.
535 def vault(self) -> dict: 536 """ 537 Returns a copy of the internal vault dictionary. 538 539 This method is used to retrieve the current state of the ZakatTracker object. 540 It provides a snapshot of the internal data structure, allowing for further 541 processing or analysis. 542 543 Returns: 544 dict: A copy of the internal vault dictionary. 545 """ 546 return self._vault.copy()
Returns a copy of the internal vault dictionary.
This method is used to retrieve the current state of the ZakatTracker object. It provides a snapshot of the internal data structure, allowing for further processing or analysis.
Returns: dict: A copy of the internal vault dictionary.
548 def stats(self) -> dict[str, tuple]: 549 """ 550 Calculates and returns statistics about the object's data storage. 551 552 This method determines the size of the database file on disk and the 553 size of the data currently held in RAM (likely within a dictionary). 554 Both sizes are reported in bytes and in a human-readable format 555 (e.g., KB, MB). 556 557 Returns: 558 dict[str, tuple]: A dictionary containing the following statistics: 559 560 * 'database': A tuple with two elements: 561 - The database file size in bytes (int). 562 - The database file size in human-readable format (str). 563 * 'ram': A tuple with two elements: 564 - The RAM usage (dictionary size) in bytes (int). 565 - The RAM usage in human-readable format (str). 566 567 Example: 568 >>> stats = my_object.stats() 569 >>> print(stats['database']) 570 (256000, '250.0 KB') 571 >>> print(stats['ram']) 572 (12345, '12.1 KB') 573 """ 574 ram_size = self.get_dict_size(self.vault()) 575 file_size = os.path.getsize(self.path()) 576 return { 577 'database': (file_size, self.human_readable_size(file_size)), 578 'ram': (ram_size, self.human_readable_size(ram_size)), 579 }
Calculates and returns statistics about the object's data storage.
This method determines the size of the database file on disk and the size of the data currently held in RAM (likely within a dictionary). Both sizes are reported in bytes and in a human-readable format (e.g., KB, MB).
Returns: dict[str, tuple]: A dictionary containing the following statistics:
* 'database': A tuple with two elements:
- The database file size in bytes (int).
- The database file size in human-readable format (str).
* 'ram': A tuple with two elements:
- The RAM usage (dictionary size) in bytes (int).
- The RAM usage in human-readable format (str).
Example:
>>> stats = my_object.stats()
>>> print(stats['database'])
(256000, '250.0 KB')
>>> print(stats['ram'])
(12345, '12.1 KB')
581 def files(self) -> list[dict[str, str | int]]: 582 """ 583 Retrieves information about files associated with this class. 584 585 This class method provides a standardized way to gather details about 586 files used by the class for storage, snapshots, and CSV imports. 587 588 Returns: 589 list[dict[str, str | int]]: A list of dictionaries, each containing information 590 about a specific file: 591 592 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 593 * path (str): The full file path. 594 * exists (bool): Whether the file exists on the filesystem. 595 * size (int): The file size in bytes (0 if the file doesn't exist). 596 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 597 598 Example: 599 ``` 600 file_info = MyClass.files() 601 for info in file_info: 602 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 603 ``` 604 """ 605 result = [] 606 for file_type, path in { 607 'database': self.path(), 608 'snapshot': self.snapshot_cache_path(), 609 'import_csv': self.import_csv_cache_path(), 610 }.items(): 611 exists = os.path.exists(path) 612 size = os.path.getsize(path) if exists else 0 613 human_readable_size = self.human_readable_size(size) if exists else 0 614 result.append({ 615 'type': file_type, 616 'path': path, 617 'exists': exists, 618 'size': size, 619 'human_readable_size': human_readable_size, 620 }) 621 return result
Retrieves information about files associated with this class.
This class method provides a standardized way to gather details about files used by the class for storage, snapshots, and CSV imports.
Returns: list[dict[str, str | int]]: A list of dictionaries, each containing information about a specific file:
* type (str): The type of file ('database', 'snapshot', 'import_csv').
* path (str): The full file path.
* exists (bool): Whether the file exists on the filesystem.
* size (int): The file size in bytes (0 if the file doesn't exist).
* human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB').
Example:
file_info = MyClass.files()
for info in file_info:
print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}")
623 def steps(self) -> dict: 624 """ 625 Returns a copy of the history of steps taken in the ZakatTracker. 626 627 The history is a dictionary where each key is a unique identifier for a step, 628 and the corresponding value is a dictionary containing information about the step. 629 630 Returns: 631 dict: A copy of the history of steps taken in the ZakatTracker. 632 """ 633 return self._vault['history'].copy()
Returns a copy of the history of steps taken in the ZakatTracker.
The history is a dictionary where each key is a unique identifier for a step, and the corresponding value is a dictionary containing information about the step.
Returns: dict: A copy of the history of steps taken in the ZakatTracker.
635 def free(self, lock: int, auto_save: bool = True) -> bool: 636 """ 637 Releases the lock on the database. 638 639 Parameters: 640 lock (int): The lock ID to be released. 641 auto_save (bool): Whether to automatically save the database after releasing the lock. 642 643 Returns: 644 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 645 """ 646 if lock == self._vault['lock']: 647 self._vault['lock'] = None 648 self.clean_history(lock) 649 if auto_save: 650 return self.save(self.path()) 651 return True 652 return False
Releases the lock on the database.
Parameters: lock (int): The lock ID to be released. auto_save (bool): Whether to automatically save the database after releasing the lock.
Returns: bool: True if the lock is successfully released and (optionally) saved, False otherwise.
654 def account_exists(self, account) -> bool: 655 """ 656 Check if the given account exists in the vault. 657 658 Parameters: 659 account (str): The account number to check. 660 661 Returns: 662 bool: True if the account exists, False otherwise. 663 """ 664 return account in self._vault['account']
Check if the given account exists in the vault.
Parameters: account (str): The account number to check.
Returns: bool: True if the account exists, False otherwise.
666 def box_size(self, account) -> int: 667 """ 668 Calculate the size of the box for a specific account. 669 670 Parameters: 671 account (str): The account number for which the box size needs to be calculated. 672 673 Returns: 674 int: The size of the box for the given account. If the account does not exist, -1 is returned. 675 """ 676 if self.account_exists(account): 677 return len(self._vault['account'][account]['box']) 678 return -1
Calculate the size of the box for a specific account.
Parameters: account (str): The account number for which the box size needs to be calculated.
Returns: int: The size of the box for the given account. If the account does not exist, -1 is returned.
680 def log_size(self, account) -> int: 681 """ 682 Get the size of the log for a specific account. 683 684 Parameters: 685 account (str): The account number for which the log size needs to be calculated. 686 687 Returns: 688 int: The size of the log for the given account. If the account does not exist, -1 is returned. 689 """ 690 if self.account_exists(account): 691 return len(self._vault['account'][account]['log']) 692 return -1
Get the size of the log for a specific account.
Parameters: account (str): The account number for which the log size needs to be calculated.
Returns: int: The size of the log for the given account. If the account does not exist, -1 is returned.
694 @staticmethod 695 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 696 """ 697 Calculates the hash of a file using the specified algorithm. 698 699 Parameters: 700 file_path (str): The path to the file. 701 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 702 703 Returns: 704 str: The hexadecimal representation of the file's hash. 705 """ 706 hash_obj = hashlib.new(algorithm) # Create the hash object 707 with open(file_path, "rb") as f: # Open file in binary mode for reading 708 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 709 hash_obj.update(chunk) 710 return hash_obj.hexdigest() # Return the hash as a hexadecimal string
Calculates the hash of a file using the specified algorithm.
Parameters: file_path (str): The path to the file. algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b".
Returns: str: The hexadecimal representation of the file's hash.
712 def snapshot_cache_path(self): 713 """ 714 Generate the path for the cache file used to store snapshots. 715 716 The cache file is a camel file that stores the timestamps of the snapshots. 717 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 718 719 Returns: 720 str: The path to the cache file. 721 """ 722 path = str(self.path()) 723 ext = self.ext() 724 ext_len = len(ext) 725 if path.endswith(f'.{ext}'): 726 path = path[:-ext_len-1] 727 _, filename = os.path.split(path + f'.snapshots.{ext}') 728 return self.base_path(filename)
Generate the path for the cache file used to store snapshots.
The cache file is a camel file that stores the timestamps of the snapshots. The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel".
Returns: str: The path to the cache file.
730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True
This function creates a snapshot of the current database state.
The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. If a snapshot with the same hash exists, the function returns True without creating a new snapshot. If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp.
Parameters: None
Returns: bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails.
762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result
Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status.
Parameters:
- hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True.
- verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False.
Returns:
- dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, and the values are tuples containing the snapshot's hash, path, and existence status.
794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True
Revert the last operation.
Parameters: dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. debug (bool): If True, the function will print debug information. Default is False.
Returns: bool: True if the operation was successful, False otherwise.
942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False
Check if a specific reference (transaction) exists in the vault for a given account and reference type.
Parameters: account (str): The account number for which to check the existence of the reference. ref_type (str): The type of reference (e.g., 'box', 'log', etc.). ref (int): The reference (transaction) number to check for existence.
Returns: bool: True if the reference exists for the given account and reference type, False otherwise.
958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref)
Check if a specific box (transaction) exists in the vault for a given account and reference.
Parameters:
- account (str): The account number for which to check the existence of the box.
- ref (int): The reference (transaction) number to check for existence.
Returns:
- bool: True if the box exists for the given account and reference, False otherwise.
971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created
This function tracks a transaction for a specific account.
Parameters: unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. desc (str): The description of the transaction. Default is an empty string. account (str): The account for which the transaction is being tracked. Default is '1'. logging (bool): Whether to log the transaction. Default is True. created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. debug (bool): Whether to print debug information. Default is False.
Returns: int: The timestamp of the transaction.
This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box.
Raises: ValueError: The log transaction happened again in the same nanosecond time. ValueError: The box transaction happened again in the same nanosecond time.
1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref)
Checks if a specific transaction log entry exists for a given account.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry.
Returns: bool: True if the transaction log entry exists, False otherwise.
1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ
This method is used to record or retrieve exchange rates for a specific account.
Parameters:
- account (str): The account number for which the exchange rate is being recorded or retrieved.
- created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used.
- rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate.
- description (str): A description of the exchange rate.
Returns:
- dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, it returns a dictionary with default values for the rate and description.
1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate
This function calculates the exchanged amount of a currency.
Args: x (float): The original amount of the currency. x_rate (float): The exchange rate of the original currency. y_rate (float): The exchange rate of the target currency.
Returns: float: The exchanged amount of the target currency.
1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy()
Retrieve the recorded exchange rates for all accounts.
Parameters: None
Returns: dict: A dictionary containing all recorded exchange rates. The keys are account names or numbers, and the values are dictionaries containing the exchange rates. Each exchange rate dictionary has timestamps as keys and exchange rate details as values.
1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result
Returns a dictionary containing account numbers as keys and their respective balances as values.
Parameters: None
Returns: dict: A dictionary where keys are account numbers and values are their respective balances.
1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {}
Retrieve the boxes (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the boxes.
Returns: dict: A dictionary containing the boxes associated with the given account. If the account does not exist, an empty dictionary is returned.
1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {}
Retrieve the logs (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the logs.
Returns: dict: A dictionary containing the logs associated with the given account. If the account does not exist, an empty dictionary is returned.
1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0
Adds a file reference to a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. path (str): The path of the file to be added.
Returns: int: The reference of the added file. If the account or transaction log entry does not exist, returns 0.
1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False
Removes a file reference from a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. file_ref (int): The reference of the file to be removed.
Returns: bool: True if the file reference is successfully removed, False otherwise.
1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1]
Calculate and return the balance of a specific account.
Parameters: account (str): The account number. Default is '1'. cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True.
Returns: int: The balance of the account.
Note: If cached is True, the function returns the cached balance. If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items.
1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False
Check or set the hide status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new hide status. If not provided, the function will return the current status.
Returns: bool: The current or updated hide status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.hide('account1') # Set the hide status of 'account1' to True
False
>>> tracker.hide('account1', True) # Set the hide status of 'account1' to True
True
>>> tracker.hide('account1') # Get the hide status of 'account1' by default
True
>>> tracker.hide('account1', False)
False
1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False
Check or set the zakatable status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new zakatable status. If not provided, the function will return the current status.
Returns: bool: The current or updated zakatable status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default
True
>>> tracker.zakatable('account1', False)
False
1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages
Subtracts a specified value from an account's balance.
Parameters: unscaled_value (float | int | Decimal): The amount to be subtracted. desc (str): A description for the transaction. Defaults to an empty string. account (str): The account from which the value will be subtracted. Defaults to '1'. created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction.
If the amount to subtract is greater than the account's balance, the remaining amount will be transferred to a new transaction with a negative value.
Raises: ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times
Transfers a specified value from one account to another.
Parameters: unscaled_amount (float | int | Decimal): The amount to be transferred. from_account (str): The account from which the value will be transferred. to_account (str): The account to which the value will be transferred. desc (str, optional): A description for the transaction. Defaults to an empty string. created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: list[int]: A list of timestamps corresponding to the transactions made during the transfer.
Raises: ValueError: Transfer to the same account is forbidden. ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan
Check the eligibility for Zakat based on the given parameters.
Parameters: silver_gram_price (float): The price of a gram of silver. unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, it will be calculated based on the silver_gram_price. debug (bool): Flag to enable debug mode. now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle().
Returns: tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, and a dictionary containing the Zakat plan.
1622 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 demand (float): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': float, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': demand, 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts
Build payment parts for the Zakat distribution.
Parameters: demand (float): The total demand for payment in local currency. positive_only (bool): If True, only consider accounts with positive balance. Default is True.
Returns: dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: { 'account': { 'account_id': {'balance': float, 'rate': float, 'part': float}, ... }, 'exceed': bool, 'demand': float, 'total': float, }
1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0
Checks the validity of payment parts.
Parameters: parts (dict): A dictionary containing payment parts information. debug (bool): Flag to enable debug mode.
Returns: int: Returns 0 if the payment parts are valid, otherwise returns the error code.
Error Codes: 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 3: 'part' value in parts['account'][x] is less than 0. 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 6: The sum of 'part' values in parts['account'] does not match with 'demand' value.
1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1779 if no_lock: 1780 self.free(self.lock()) 1781 return True
Perform Zakat calculation based on the given report and optional parts.
Parameters: report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. parts (dict): A dictionary containing the payment parts for the zakat. debug (bool): A flag indicating whether to print debug information.
Returns: bool: True if the zakat calculation is successful, False otherwise.
1783 def export_json(self, path: str = "data.json") -> bool: 1784 """ 1785 Exports the current state of the ZakatTracker object to a JSON file. 1786 1787 Parameters: 1788 path (str): The path where the JSON file will be saved. Default is "data.json". 1789 1790 Returns: 1791 bool: True if the export is successful, False otherwise. 1792 1793 Raises: 1794 No specific exceptions are raised by this method. 1795 """ 1796 with open(path, "w") as file: 1797 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1798 return True
Exports the current state of the ZakatTracker object to a JSON file.
Parameters: path (str): The path where the JSON file will be saved. Default is "data.json".
Returns: bool: True if the export is successful, False otherwise.
Raises: No specific exceptions are raised by this method.
1800 def save(self, path: str = None) -> bool: 1801 """ 1802 Saves the ZakatTracker's current state to a camel file. 1803 1804 This method serializes the internal data (`_vault`). 1805 1806 Parameters: 1807 path (str, optional): File path for saving. Defaults to a predefined location. 1808 1809 Returns: 1810 bool: True if the save operation is successful, False otherwise. 1811 """ 1812 if path is None: 1813 path = self.path() 1814 with open(f'{path}.tmp', 'w') as stream: 1815 # first save in tmp file 1816 stream.write(camel.dump(self._vault)) 1817 # then move tmp file to original location 1818 shutil.move(f'{path}.tmp', path) 1819 return True
Saves the ZakatTracker's current state to a camel file.
This method serializes the internal data (_vault
).
Parameters: path (str, optional): File path for saving. Defaults to a predefined location.
Returns: bool: True if the save operation is successful, False otherwise.
1821 def load(self, path: str = None) -> bool: 1822 """ 1823 Load the current state of the ZakatTracker object from a camel file. 1824 1825 Parameters: 1826 path (str): The path where the camel file is located. If not provided, it will use the default path. 1827 1828 Returns: 1829 bool: True if the load operation is successful, False otherwise. 1830 """ 1831 if path is None: 1832 path = self.path() 1833 if os.path.exists(path): 1834 with open(path, 'r') as stream: 1835 self._vault = camel.load(stream.read()) 1836 return True 1837 return False
Load the current state of the ZakatTracker object from a camel file.
Parameters: path (str): The path where the camel file is located. If not provided, it will use the default path.
Returns: bool: True if the load operation is successful, False otherwise.
1839 def import_csv_cache_path(self): 1840 """ 1841 Generates the cache file path for imported CSV data. 1842 1843 This function constructs the file path where cached data from CSV imports 1844 will be stored. The cache file is a camel file (.camel extension) appended 1845 to the base path of the object. 1846 1847 Returns: 1848 str: The full path to the import CSV cache file. 1849 1850 Example: 1851 >>> obj = ZakatTracker('/data/reports') 1852 >>> obj.import_csv_cache_path() 1853 '/data/reports.import_csv.camel' 1854 """ 1855 path = str(self.path()) 1856 ext = self.ext() 1857 ext_len = len(ext) 1858 if path.endswith(f'.{ext}'): 1859 path = path[:-ext_len-1] 1860 _, filename = os.path.split(path + f'.import_csv.{ext}') 1861 return self.base_path(filename)
Generates the cache file path for imported CSV data.
This function constructs the file path where cached data from CSV imports will be stored. The cache file is a camel file (.camel extension) appended to the base path of the object.
Returns: str: The full path to the import CSV cache file.
Example:
obj = ZakatTracker('/data/reports') obj.import_csv_cache_path() '/data/reports.import_csv.camel'
1863 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1864 """ 1865 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1866 1867 Parameters: 1868 path (str): The path to the CSV file. Default is 'file.csv'. 1869 debug (bool): A flag indicating whether to print debug information. 1870 1871 Returns: 1872 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1873 and a dictionary of bad transactions. 1874 1875 Notes: 1876 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1877 are appropriate for the currency pairs involved in the conversions. 1878 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1879 to 1.0 or the previous rate for that account. 1880 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1881 transactions of the same account within the whole imported and existing dataset when doing `check` and 1882 `zakat` operations. 1883 1884 Example Usage: 1885 The CSV file should have the following format, rate is optional per transaction: 1886 account, desc, value, date, rate 1887 For example: 1888 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1889 """ 1890 if debug: 1891 print('import_csv', f'debug={debug}') 1892 cache: list[int] = [] 1893 try: 1894 with open(self.import_csv_cache_path(), 'r') as stream: 1895 cache = camel.load(stream.read()) 1896 except: 1897 pass 1898 date_formats = [ 1899 "%Y-%m-%d %H:%M:%S", 1900 "%Y-%m-%dT%H:%M:%S", 1901 "%Y-%m-%dT%H%M%S", 1902 "%Y-%m-%d", 1903 ] 1904 created, found, bad = 0, 0, {} 1905 data: dict[int, list] = {} 1906 with open(path, newline='', encoding="utf-8") as f: 1907 i = 0 1908 for row in csv.reader(f, delimiter=','): 1909 i += 1 1910 hashed = hash(tuple(row)) 1911 if hashed in cache: 1912 found += 1 1913 continue 1914 account = row[0] 1915 desc = row[1] 1916 value = float(row[2]) 1917 rate = 1.0 1918 if row[4:5]: # Empty list if index is out of range 1919 rate = float(row[4]) 1920 date: int = 0 1921 for time_format in date_formats: 1922 try: 1923 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1924 break 1925 except: 1926 pass 1927 # TODO: not allowed for negative dates in the future after enhance time functions 1928 if date == 0: 1929 bad[i] = row + ['invalid date'] 1930 if value == 0: 1931 bad[i] = row + ['invalid value'] 1932 continue 1933 if date not in data: 1934 data[date] = [] 1935 data[date].append((i, account, desc, value, date, rate, hashed)) 1936 1937 if debug: 1938 print('import_csv', len(data)) 1939 1940 if bad: 1941 return created, found, bad 1942 1943 for date, rows in sorted(data.items()): 1944 try: 1945 len_rows = len(rows) 1946 if len_rows == 1: 1947 (_, account, desc, value, date, rate, hashed) = rows[0] 1948 if rate > 0: 1949 self.exchange(account, created=date, rate=rate) 1950 if value > 0: 1951 self.track(value, desc, account, True, date) 1952 elif value < 0: 1953 self.sub(-value, desc, account, date) 1954 created += 1 1955 cache.append(hashed) 1956 continue 1957 if debug: 1958 print('-- Duplicated time detected', date, 'len', len_rows) 1959 print(rows) 1960 print('---------------------------------') 1961 # If records are found at the same time with different accounts in the same amount 1962 # (one positive and the other negative), this indicates it is a transfer. 1963 if len_rows != 2: 1964 raise Exception(f'more than two transactions({len_rows}) at the same time') 1965 (i, account1, desc1, value1, date1, rate1, _) = rows[0] 1966 (j, account2, desc2, value2, date2, rate2, _) = rows[1] 1967 if account1 == account2 or desc1 != desc2 or abs(value1) != abs(value2) or date1 != date2: 1968 raise Exception('invalid transfer') 1969 if rate1 > 0: 1970 self.exchange(account1, created=date1, rate=rate1) 1971 if rate2 > 0: 1972 self.exchange(account2, created=date2, rate=rate2) 1973 values = { 1974 value1: account1, 1975 value2: account2, 1976 } 1977 self.transfer( 1978 unscaled_amount=abs(value1), 1979 from_account=values[min(values.keys())], 1980 to_account=values[max(values.keys())], 1981 desc=desc1, 1982 created=date1, 1983 ) 1984 except Exception as e: 1985 for (i, account, desc, value, date, rate, _) in rows: 1986 bad[i] = (account, desc, value, date, rate, e) 1987 break 1988 with open(self.import_csv_cache_path(), 'w') as stream: 1989 stream.write(camel.dump(cache)) 1990 return created, found, bad
The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system.
Parameters: path (str): The path to the CSV file. Default is 'file.csv'. debug (bool): A flag indicating whether to print debug information.
Returns: tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, and a dictionary of bad transactions.
Notes:
* Currency Pair Assumption: This function assumes that the exchange rates stored for each account
are appropriate for the currency pairs involved in the conversions.
* The exchange rate for each account is based on the last encountered transaction rate that is not equal
to 1.0 or the previous rate for that account.
* Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent
transactions of the same account within the whole imported and existing dataset when doing check
and
zakat
operations.
Example Usage: The CSV file should have the following format, rate is optional per transaction: account, desc, value, date, rate For example: safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1
1996 @staticmethod 1997 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1998 """ 1999 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 2000 2001 This function iterates through progressively larger units of information 2002 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2003 range that can be expressed with a reasonable number before the unit. 2004 2005 Parameters: 2006 size (float): The size in bytes to convert. 2007 decimal_places (int, optional): The number of decimal places to display 2008 in the result. Defaults to 2. 2009 2010 Returns: 2011 str: A string representation of the size in a human-readable format, 2012 rounded to the specified number of decimal places. For example: 2013 - "1.50 KB" (1536 bytes) 2014 - "23.00 MB" (24117248 bytes) 2015 - "1.23 GB" (1325899906 bytes) 2016 """ 2017 if type(size) not in (float, int): 2018 raise TypeError("size must be a float or integer") 2019 if type(decimal_places) != int: 2020 raise TypeError("decimal_places must be an integer") 2021 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2022 if size < 1024.0: 2023 break 2024 size /= 1024.0 2025 return f"{size:.{decimal_places}f} {unit}"
Converts a size in bytes to a human-readable format (e.g., KB, MB, GB).
This function iterates through progressively larger units of information (B, KB, MB, GB, etc.) and divides the input size until it fits within a range that can be expressed with a reasonable number before the unit.
Parameters: size (float): The size in bytes to convert. decimal_places (int, optional): The number of decimal places to display in the result. Defaults to 2.
Returns: str: A string representation of the size in a human-readable format, rounded to the specified number of decimal places. For example: - "1.50 KB" (1536 bytes) - "23.00 MB" (24117248 bytes) - "1.23 GB" (1325899906 bytes)
2027 @staticmethod 2028 def get_dict_size(obj: dict, seen: set = None) -> float: 2029 """ 2030 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2031 2032 This function traverses the dictionary structure, accounting for the size of keys, values, 2033 and any nested objects. It handles various data types commonly found in dictionaries 2034 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2035 of circular references. 2036 2037 Parameters: 2038 obj (dict): The dictionary whose size is to be calculated. 2039 seen (set, optional): A set used internally to track visited objects 2040 and avoid circular references. Defaults to None. 2041 2042 Returns: 2043 float: An approximate size of the dictionary and its contents in bytes. 2044 2045 Note: 2046 - This function is a method of the `ZakatTracker` class and is likely used to 2047 estimate the memory footprint of data structures relevant to Zakat calculations. 2048 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2049 not account for all memory overhead depending on the Python implementation. 2050 - Circular references are handled to prevent infinite recursion. 2051 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2052 - String sizes are estimated based on character length and encoding. 2053 """ 2054 size = 0 2055 if seen is None: 2056 seen = set() 2057 2058 obj_id = id(obj) 2059 if obj_id in seen: 2060 return 0 2061 2062 seen.add(obj_id) 2063 size += sys.getsizeof(obj) 2064 2065 if isinstance(obj, dict): 2066 for k, v in obj.items(): 2067 size += ZakatTracker.get_dict_size(k, seen) 2068 size += ZakatTracker.get_dict_size(v, seen) 2069 elif isinstance(obj, (list, tuple, set, frozenset)): 2070 for item in obj: 2071 size += ZakatTracker.get_dict_size(item, seen) 2072 elif isinstance(obj, (int, float, complex)): # Handle numbers 2073 pass # Basic numbers have a fixed size, so nothing to add here 2074 elif isinstance(obj, str): # Handle strings 2075 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2076 return size
Recursively calculates the approximate memory size of a dictionary and its contents in bytes.
This function traverses the dictionary structure, accounting for the size of keys, values, and any nested objects. It handles various data types commonly found in dictionaries (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case of circular references.
Parameters: obj (dict): The dictionary whose size is to be calculated. seen (set, optional): A set used internally to track visited objects and avoid circular references. Defaults to None.
Returns: float: An approximate size of the dictionary and its contents in bytes.
Note:
- This function is a method of the
ZakatTracker
class and is likely used to estimate the memory footprint of data structures relevant to Zakat calculations. - The size calculation is approximate as it relies on
sys.getsizeof()
, which might not account for all memory overhead depending on the Python implementation. - Circular references are handled to prevent infinite recursion.
- Basic numeric types (int, float, complex) are assumed to have fixed sizes.
- String sizes are estimated based on character length and encoding.
2078 @staticmethod 2079 def duration_from_nanoseconds(ns: int, 2080 show_zeros_in_spoken_time: bool = False, 2081 spoken_time_separator=',', 2082 millennia: str = 'Millennia', 2083 century: str = 'Century', 2084 years: str = 'Years', 2085 days: str = 'Days', 2086 hours: str = 'Hours', 2087 minutes: str = 'Minutes', 2088 seconds: str = 'Seconds', 2089 milli_seconds: str = 'MilliSeconds', 2090 micro_seconds: str = 'MicroSeconds', 2091 nano_seconds: str = 'NanoSeconds', 2092 ) -> tuple: 2093 """ 2094 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2095 Convert NanoSeconds to Human Readable Time Format. 2096 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2097 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2098 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2099 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2100 2101 INPUT : ms (AKA: MilliSeconds) 2102 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2103 OUTPUT Variables: time_lapsed, spoken_time 2104 2105 Example Input: duration_from_nanoseconds(ns) 2106 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2107 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2108 duration_from_nanoseconds(1234567890123456789012) 2109 """ 2110 us, ns = divmod(ns, 1000) 2111 ms, us = divmod(us, 1000) 2112 s, ms = divmod(ms, 1000) 2113 m, s = divmod(s, 60) 2114 h, m = divmod(m, 60) 2115 d, h = divmod(h, 24) 2116 y, d = divmod(d, 365) 2117 c, y = divmod(y, 100) 2118 n, c = divmod(c, 10) 2119 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2120 spoken_time_part = [] 2121 if n > 0 or show_zeros_in_spoken_time: 2122 spoken_time_part.append(f"{n: 3d} {millennia}") 2123 if c > 0 or show_zeros_in_spoken_time: 2124 spoken_time_part.append(f"{c: 4d} {century}") 2125 if y > 0 or show_zeros_in_spoken_time: 2126 spoken_time_part.append(f"{y: 3d} {years}") 2127 if d > 0 or show_zeros_in_spoken_time: 2128 spoken_time_part.append(f"{d: 4d} {days}") 2129 if h > 0 or show_zeros_in_spoken_time: 2130 spoken_time_part.append(f"{h: 2d} {hours}") 2131 if m > 0 or show_zeros_in_spoken_time: 2132 spoken_time_part.append(f"{m: 2d} {minutes}") 2133 if s > 0 or show_zeros_in_spoken_time: 2134 spoken_time_part.append(f"{s: 2d} {seconds}") 2135 if ms > 0 or show_zeros_in_spoken_time: 2136 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2137 if us > 0 or show_zeros_in_spoken_time: 2138 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2139 if ns > 0 or show_zeros_in_spoken_time: 2140 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2141 return time_lapsed, spoken_time_separator.join(spoken_time_part)
REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 Convert NanoSeconds to Human Readable Time Format. A NanoSeconds is a unit of time in the International System of Units (SI) equal to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. Its symbol is μs, sometimes simplified to us when Unicode is not available. A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond.
INPUT : ms (AKA: MilliSeconds) OUTPUT: tuple(string time_lapsed, string spoken_time) like format. OUTPUT Variables: time_lapsed, spoken_time
Example Input: duration_from_nanoseconds(ns) "Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds" Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') duration_from_nanoseconds(1234567890123456789012)
2143 @staticmethod 2144 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2145 """ 2146 Convert a specific day, month, and year into a timestamp. 2147 2148 Parameters: 2149 day (int): The day of the month. 2150 month (int): The month of the year. Default is 6 (June). 2151 year (int): The year. Default is 2024. 2152 2153 Returns: 2154 int: The timestamp representing the given day, month, and year. 2155 2156 Note: 2157 This method assumes the default month and year if not provided. 2158 """ 2159 return ZakatTracker.time(datetime.datetime(year, month, day))
Convert a specific day, month, and year into a timestamp.
Parameters: day (int): The day of the month. month (int): The month of the year. Default is 6 (June). year (int): The year. Default is 2024.
Returns: int: The timestamp representing the given day, month, and year.
Note: This method assumes the default month and year if not provided.
2161 @staticmethod 2162 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2163 """ 2164 Generate a random date between two given dates. 2165 2166 Parameters: 2167 start_date (datetime.datetime): The start date from which to generate a random date. 2168 end_date (datetime.datetime): The end date until which to generate a random date. 2169 2170 Returns: 2171 datetime.datetime: A random date between the start_date and end_date. 2172 """ 2173 time_between_dates = end_date - start_date 2174 days_between_dates = time_between_dates.days 2175 random_number_of_days = random.randrange(days_between_dates) 2176 return start_date + datetime.timedelta(days=random_number_of_days)
Generate a random date between two given dates.
Parameters: start_date (datetime.datetime): The start date from which to generate a random date. end_date (datetime.datetime): The end date until which to generate a random date.
Returns: datetime.datetime: A random date between the start_date and end_date.
2178 @staticmethod 2179 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2180 debug: bool = False) -> int: 2181 """ 2182 Generate a random CSV file with specified parameters. 2183 2184 Parameters: 2185 path (str): The path where the CSV file will be saved. Default is "data.csv". 2186 count (int): The number of rows to generate in the CSV file. Default is 1000. 2187 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2188 debug (bool): A flag indicating whether to print debug information. 2189 2190 Returns: 2191 None. The function generates a CSV file at the specified path with the given count of rows. 2192 Each row contains a randomly generated account, description, value, and date. 2193 The value is randomly generated between 1000 and 100000, 2194 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2195 If the row number is not divisible by 13, the value is multiplied by -1. 2196 """ 2197 if debug: 2198 print('generate_random_csv_file', f'debug={debug}') 2199 i = 0 2200 with open(path, "w", newline="") as csvfile: 2201 writer = csv.writer(csvfile) 2202 for i in range(count): 2203 account = f"acc-{random.randint(1, 1000)}" 2204 desc = f"Some text {random.randint(1, 1000)}" 2205 value = random.randint(1000, 100000) 2206 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2207 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2208 if not i % 13 == 0: 2209 value *= -1 2210 row = [account, desc, value, date] 2211 if with_rate: 2212 rate = random.randint(1, 100) * 0.12 2213 if debug: 2214 print('before-append', row) 2215 row.append(rate) 2216 if debug: 2217 print('after-append', row) 2218 writer.writerow(row) 2219 i = i + 1 2220 return i
Generate a random CSV file with specified parameters.
Parameters: path (str): The path where the CSV file will be saved. Default is "data.csv". count (int): The number of rows to generate in the CSV file. Default is 1000. with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. debug (bool): A flag indicating whether to print debug information.
Returns: None. The function generates a CSV file at the specified path with the given count of rows. Each row contains a randomly generated account, description, value, and date. The value is randomly generated between 1000 and 100000, and the date is randomly generated between 1950-01-01 and 2023-12-31. If the row number is not divisible by 13, the value is multiplied by -1.
2222 @staticmethod 2223 def create_random_list(max_sum, min_value=0, max_value=10): 2224 """ 2225 Creates a list of random integers whose sum does not exceed the specified maximum. 2226 2227 Args: 2228 max_sum: The maximum allowed sum of the list elements. 2229 min_value: The minimum possible value for an element (inclusive). 2230 max_value: The maximum possible value for an element (inclusive). 2231 2232 Returns: 2233 A list of random integers. 2234 """ 2235 result = [] 2236 current_sum = 0 2237 2238 while current_sum < max_sum: 2239 # Calculate the remaining space for the next element 2240 remaining_sum = max_sum - current_sum 2241 # Determine the maximum possible value for the next element 2242 next_max_value = min(remaining_sum, max_value) 2243 # Generate a random element within the allowed range 2244 next_element = random.randint(min_value, next_max_value) 2245 result.append(next_element) 2246 current_sum += next_element 2247 2248 return result
Creates a list of random integers whose sum does not exceed the specified maximum.
Args: max_sum: The maximum allowed sum of the list elements. min_value: The minimum possible value for an element (inclusive). max_value: The maximum possible value for an element (inclusive).
Returns: A list of random integers.
2479 def test(self, debug: bool = False) -> bool: 2480 if debug: 2481 print('test', f'debug={debug}') 2482 try: 2483 2484 self._test_core(True, debug) 2485 self._test_core(False, debug) 2486 2487 assert self._history() 2488 2489 # Not allowed for duplicate transactions in the same account and time 2490 2491 created = ZakatTracker.time() 2492 self.track(100, 'test-1', 'same', True, created) 2493 failed = False 2494 try: 2495 self.track(50, 'test-1', 'same', True, created) 2496 except: 2497 failed = True 2498 assert failed is True 2499 2500 self.reset() 2501 2502 # Same account transfer 2503 for x in [1, 'a', True, 1.8, None]: 2504 failed = False 2505 try: 2506 self.transfer(1, x, x, 'same-account', debug=debug) 2507 except: 2508 failed = True 2509 assert failed is True 2510 2511 # Always preserve box age during transfer 2512 2513 series: list[tuple] = [ 2514 (30, 4), 2515 (60, 3), 2516 (90, 2), 2517 ] 2518 case = { 2519 3000: { 2520 'series': series, 2521 'rest': 15000, 2522 }, 2523 6000: { 2524 'series': series, 2525 'rest': 12000, 2526 }, 2527 9000: { 2528 'series': series, 2529 'rest': 9000, 2530 }, 2531 18000: { 2532 'series': series, 2533 'rest': 0, 2534 }, 2535 27000: { 2536 'series': series, 2537 'rest': -9000, 2538 }, 2539 36000: { 2540 'series': series, 2541 'rest': -18000, 2542 }, 2543 } 2544 2545 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2546 2547 for total in case: 2548 if debug: 2549 print('--------------------------------------------------------') 2550 print(f'case[{total}]', case[total]) 2551 for x in case[total]['series']: 2552 self.track( 2553 unscaled_value=x[0], 2554 desc=f"test-{x} ages", 2555 account='ages', 2556 logging=True, 2557 created=selected_time * x[1], 2558 ) 2559 2560 unscaled_total = self.unscale(total) 2561 if debug: 2562 print('unscaled_total', unscaled_total) 2563 refs = self.transfer( 2564 unscaled_amount=unscaled_total, 2565 from_account='ages', 2566 to_account='future', 2567 desc='Zakat Movement', 2568 debug=debug, 2569 ) 2570 2571 if debug: 2572 print('refs', refs) 2573 2574 ages_cache_balance = self.balance('ages') 2575 ages_fresh_balance = self.balance('ages', False) 2576 rest = case[total]['rest'] 2577 if debug: 2578 print('source', ages_cache_balance, ages_fresh_balance, rest) 2579 assert ages_cache_balance == rest 2580 assert ages_fresh_balance == rest 2581 2582 future_cache_balance = self.balance('future') 2583 future_fresh_balance = self.balance('future', False) 2584 if debug: 2585 print('target', future_cache_balance, future_fresh_balance, total) 2586 print('refs', refs) 2587 assert future_cache_balance == total 2588 assert future_fresh_balance == total 2589 2590 # TODO: check boxes times for `ages` should equal box times in `future` 2591 for ref in self._vault['account']['ages']['box']: 2592 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2593 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2594 future_capital = 0 2595 if ref in self._vault['account']['future']['box']: 2596 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2597 future_rest = 0 2598 if ref in self._vault['account']['future']['box']: 2599 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2600 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2601 if debug: 2602 print('================================================================') 2603 print('ages', ages_capital, ages_rest) 2604 print('future', future_capital, future_rest) 2605 if ages_rest == 0: 2606 assert ages_capital == future_capital 2607 elif ages_rest < 0: 2608 assert -ages_capital == future_capital 2609 elif ages_rest > 0: 2610 assert ages_capital == ages_rest + future_capital 2611 self.reset() 2612 assert len(self._vault['history']) == 0 2613 2614 assert self._history() 2615 assert self._history(False) is False 2616 assert self._history() is False 2617 assert self._history(True) 2618 assert self._history() 2619 if debug: 2620 print('####################################################################') 2621 2622 transaction = [ 2623 ( 2624 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2625 2000, 2000, 2000, 1, 1, 2626 ), 2627 ( 2628 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2629 75000, 75000, 75000, 1, 1, 2630 ), 2631 ( 2632 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2633 60000, 60000, 60000, 1, 1, 2634 ), 2635 ] 2636 for z in transaction: 2637 self.lock() 2638 x = z[1] 2639 y = z[2] 2640 self.transfer( 2641 unscaled_amount=z[0], 2642 from_account=x, 2643 to_account=y, 2644 desc='test-transfer', 2645 debug=debug, 2646 ) 2647 zz = self.balance(x) 2648 if debug: 2649 print(zz, z) 2650 assert zz == z[3] 2651 xx = self.accounts()[x] 2652 assert xx == z[3] 2653 assert self.balance(x, False) == z[4] 2654 assert xx == z[4] 2655 2656 s = 0 2657 log = self._vault['account'][x]['log'] 2658 for i in log: 2659 s += log[i]['value'] 2660 if debug: 2661 print('s', s, 'z[5]', z[5]) 2662 assert s == z[5] 2663 2664 assert self.box_size(x) == z[6] 2665 assert self.log_size(x) == z[7] 2666 2667 yy = self.accounts()[y] 2668 assert self.balance(y) == z[8] 2669 assert yy == z[8] 2670 assert self.balance(y, False) == z[9] 2671 assert yy == z[9] 2672 2673 s = 0 2674 log = self._vault['account'][y]['log'] 2675 for i in log: 2676 s += log[i]['value'] 2677 assert s == z[10] 2678 2679 assert self.box_size(y) == z[11] 2680 assert self.log_size(y) == z[12] 2681 assert self.free(self.lock()) 2682 2683 if debug: 2684 pp().pprint(self.check(2.17)) 2685 2686 assert not self.nolock() 2687 history_count = len(self._vault['history']) 2688 if debug: 2689 print('history-count', history_count) 2690 assert history_count == 4 2691 assert not self.free(ZakatTracker.time()) 2692 assert self.free(self.lock()) 2693 assert self.nolock() 2694 assert len(self._vault['history']) == 3 2695 2696 # storage 2697 2698 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 2699 if os.path.exists(_path): 2700 os.remove(_path) 2701 self.save() 2702 assert os.path.getsize(_path) > 0 2703 self.reset() 2704 assert self.recall(False, debug) is False 2705 self.load() 2706 assert self._vault['account'] is not None 2707 2708 # recall 2709 2710 assert self.nolock() 2711 assert len(self._vault['history']) == 3 2712 assert self.recall(False, debug) is True 2713 assert len(self._vault['history']) == 2 2714 assert self.recall(False, debug) is True 2715 assert len(self._vault['history']) == 1 2716 assert self.recall(False, debug) is True 2717 assert len(self._vault['history']) == 0 2718 assert self.recall(False, debug) is False 2719 assert len(self._vault['history']) == 0 2720 2721 # exchange 2722 2723 self.exchange("cash", 25, 3.75, "2024-06-25") 2724 self.exchange("cash", 22, 3.73, "2024-06-22") 2725 self.exchange("cash", 15, 3.69, "2024-06-15") 2726 self.exchange("cash", 10, 3.66) 2727 2728 for i in range(1, 30): 2729 exchange = self.exchange("cash", i) 2730 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2731 if debug: 2732 print(i, rate, description, created) 2733 assert created 2734 if i < 10: 2735 assert rate == 1 2736 assert description is None 2737 elif i == 10: 2738 assert rate == 3.66 2739 assert description is None 2740 elif i < 15: 2741 assert rate == 3.66 2742 assert description is None 2743 elif i == 15: 2744 assert rate == 3.69 2745 assert description is not None 2746 elif i < 22: 2747 assert rate == 3.69 2748 assert description is not None 2749 elif i == 22: 2750 assert rate == 3.73 2751 assert description is not None 2752 elif i >= 25: 2753 assert rate == 3.75 2754 assert description is not None 2755 exchange = self.exchange("bank", i) 2756 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2757 if debug: 2758 print(i, rate, description, created) 2759 assert created 2760 assert rate == 1 2761 assert description is None 2762 2763 assert len(self._vault['exchange']) > 0 2764 assert len(self.exchanges()) > 0 2765 self._vault['exchange'].clear() 2766 assert len(self._vault['exchange']) == 0 2767 assert len(self.exchanges()) == 0 2768 2769 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2770 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2771 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2772 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2773 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2774 2775 for i in [x * 0.12 for x in range(-15, 21)]: 2776 if i <= 0: 2777 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2778 else: 2779 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2780 2781 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2782 for i in range(1, 31): 2783 timestamp_ns = ZakatTracker.day_to_time(i) 2784 exchange = self.exchange("cash", timestamp_ns) 2785 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2786 if debug: 2787 print(i, rate, description, created) 2788 assert created 2789 if i < 10: 2790 assert rate == 1 2791 assert description is None 2792 elif i == 10: 2793 assert rate == 3.66 2794 assert description is None 2795 elif i < 15: 2796 assert rate == 3.66 2797 assert description is None 2798 elif i == 15: 2799 assert rate == 3.69 2800 assert description is not None 2801 elif i < 22: 2802 assert rate == 3.69 2803 assert description is not None 2804 elif i == 22: 2805 assert rate == 3.73 2806 assert description is not None 2807 elif i >= 25: 2808 assert rate == 3.75 2809 assert description is not None 2810 exchange = self.exchange("bank", i) 2811 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2812 if debug: 2813 print(i, rate, description, created) 2814 assert created 2815 assert rate == 1 2816 assert description is None 2817 2818 # csv 2819 2820 csv_count = 1000 2821 2822 for with_rate, path in { 2823 False: 'test-import_csv-no-exchange', 2824 True: 'test-import_csv-with-exchange', 2825 }.items(): 2826 2827 if debug: 2828 print('test_import_csv', with_rate, path) 2829 2830 csv_path = path + '.csv' 2831 if os.path.exists(csv_path): 2832 os.remove(csv_path) 2833 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2834 if debug: 2835 print('generate_random_csv_file', c) 2836 assert c == csv_count 2837 assert os.path.getsize(csv_path) > 0 2838 cache_path = self.import_csv_cache_path() 2839 if os.path.exists(cache_path): 2840 os.remove(cache_path) 2841 self.reset() 2842 (created, found, bad) = self.import_csv(csv_path, debug) 2843 bad_count = len(bad) 2844 assert bad_count > 0 2845 if debug: 2846 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2847 print('bad', bad) 2848 tmp_size = os.path.getsize(cache_path) 2849 assert tmp_size > 0 2850 # TODO: assert created + found + bad_count == csv_count 2851 # TODO: assert created == csv_count 2852 # TODO: assert bad_count == 0 2853 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2854 bad_2_count = len(bad_2) 2855 if debug: 2856 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2857 print('bad', bad) 2858 assert bad_2_count > 0 2859 # TODO: assert tmp_size == os.path.getsize(cache_path) 2860 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2861 # TODO: assert created == found_2 2862 # TODO: assert bad_count == bad_2_count 2863 # TODO: assert found_2 == csv_count 2864 # TODO: assert bad_2_count == 0 2865 # TODO: assert created_2 == 0 2866 2867 # payment parts 2868 2869 positive_parts = self.build_payment_parts(100, positive_only=True) 2870 assert self.check_payment_parts(positive_parts) != 0 2871 assert self.check_payment_parts(positive_parts) != 0 2872 all_parts = self.build_payment_parts(300, positive_only=False) 2873 assert self.check_payment_parts(all_parts) != 0 2874 assert self.check_payment_parts(all_parts) != 0 2875 if debug: 2876 pp().pprint(positive_parts) 2877 pp().pprint(all_parts) 2878 # dynamic discount 2879 suite = [] 2880 count = 3 2881 for exceed in [False, True]: 2882 case = [] 2883 for parts in [positive_parts, all_parts]: 2884 part = parts.copy() 2885 demand = part['demand'] 2886 if debug: 2887 print(demand, part['total']) 2888 i = 0 2889 z = demand / count 2890 cp = { 2891 'account': {}, 2892 'demand': demand, 2893 'exceed': exceed, 2894 'total': part['total'], 2895 } 2896 j = '' 2897 for x, y in part['account'].items(): 2898 x_exchange = self.exchange(x) 2899 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2900 if exceed and zz <= demand: 2901 i += 1 2902 y['part'] = zz 2903 if debug: 2904 print(exceed, y) 2905 cp['account'][x] = y 2906 case.append(y) 2907 elif not exceed and y['balance'] >= zz: 2908 i += 1 2909 y['part'] = zz 2910 if debug: 2911 print(exceed, y) 2912 cp['account'][x] = y 2913 case.append(y) 2914 j = x 2915 if i >= count: 2916 break 2917 if len(cp['account'][j]) > 0: 2918 suite.append(cp) 2919 if debug: 2920 print('suite', len(suite)) 2921 # vault = self._vault.copy() 2922 for case in suite: 2923 # self._vault = vault.copy() 2924 if debug: 2925 print('case', case) 2926 result = self.check_payment_parts(case) 2927 if debug: 2928 print('check_payment_parts', result, f'exceed: {exceed}') 2929 assert result == 0 2930 2931 report = self.check(2.17, None, debug) 2932 (valid, brief, plan) = report 2933 if debug: 2934 print('valid', valid) 2935 zakat_result = self.zakat(report, parts=case, debug=debug) 2936 if debug: 2937 print('zakat-result', zakat_result) 2938 assert valid == zakat_result 2939 2940 assert self.save(path + f'.{self.ext()}') 2941 assert self.export_json(path + '.json') 2942 2943 assert self.export_json("1000-transactions-test.json") 2944 assert self.save(f"1000-transactions-test.{self.ext()}") 2945 2946 self.reset() 2947 2948 # test transfer between accounts with different exchange rate 2949 2950 a_SAR = "Bank (SAR)" 2951 b_USD = "Bank (USD)" 2952 c_SAR = "Safe (SAR)" 2953 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2954 for case in [ 2955 (0, a_SAR, "SAR Gift", 1000, 100000), 2956 (1, a_SAR, 1), 2957 (0, b_USD, "USD Gift", 500, 50000), 2958 (1, b_USD, 1), 2959 (2, b_USD, 3.75), 2960 (1, b_USD, 3.75), 2961 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2962 (0, c_SAR, "Salary", 750, 75000), 2963 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2964 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2965 ]: 2966 if debug: 2967 print('case', case) 2968 match (case[0]): 2969 case 0: # track 2970 _, account, desc, x, balance = case 2971 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2972 2973 cached_value = self.balance(account, cached=True) 2974 fresh_value = self.balance(account, cached=False) 2975 if debug: 2976 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2977 assert cached_value == balance 2978 assert fresh_value == balance 2979 case 1: # check-exchange 2980 _, account, expected_rate = case 2981 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2982 if debug: 2983 print('t-exchange', t_exchange) 2984 assert t_exchange['rate'] == expected_rate 2985 case 2: # do-exchange 2986 _, account, rate = case 2987 self.exchange(account, rate=rate, debug=debug) 2988 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2989 if debug: 2990 print('b-exchange', b_exchange) 2991 assert b_exchange['rate'] == rate 2992 case 3: # transfer 2993 _, x, a, b, desc, a_balance, b_balance = case 2994 self.transfer(x, a, b, desc, debug=debug) 2995 2996 cached_value = self.balance(a, cached=True) 2997 fresh_value = self.balance(a, cached=False) 2998 if debug: 2999 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 3000 assert cached_value == a_balance 3001 assert fresh_value == a_balance 3002 3003 cached_value = self.balance(b, cached=True) 3004 fresh_value = self.balance(b, cached=False) 3005 if debug: 3006 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3007 assert cached_value == b_balance 3008 assert fresh_value == b_balance 3009 3010 # Transfer all in many chunks randomly from B to A 3011 a_SAR_balance = 137125 3012 b_USD_balance = 50100 3013 b_USD_exchange = self.exchange(b_USD) 3014 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3015 if debug: 3016 print('amounts', amounts) 3017 i = 0 3018 for x in amounts: 3019 if debug: 3020 print(f'{i} - transfer-with-exchange({x})') 3021 self.transfer( 3022 unscaled_amount=self.unscale(x), 3023 from_account=b_USD, 3024 to_account=a_SAR, 3025 desc=f"{x} USD -> SAR", 3026 debug=debug, 3027 ) 3028 3029 b_USD_balance -= x 3030 cached_value = self.balance(b_USD, cached=True) 3031 fresh_value = self.balance(b_USD, cached=False) 3032 if debug: 3033 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3034 b_USD_balance) 3035 assert cached_value == b_USD_balance 3036 assert fresh_value == b_USD_balance 3037 3038 a_SAR_balance += int(x * b_USD_exchange['rate']) 3039 cached_value = self.balance(a_SAR, cached=True) 3040 fresh_value = self.balance(a_SAR, cached=False) 3041 if debug: 3042 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3043 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3044 assert cached_value == a_SAR_balance 3045 assert fresh_value == a_SAR_balance 3046 i += 1 3047 3048 # Transfer all in many chunks randomly from C to A 3049 c_SAR_balance = 37500 3050 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3051 if debug: 3052 print('amounts', amounts) 3053 i = 0 3054 for x in amounts: 3055 if debug: 3056 print(f'{i} - transfer-with-exchange({x})') 3057 self.transfer( 3058 unscaled_amount=self.unscale(x), 3059 from_account=c_SAR, 3060 to_account=a_SAR, 3061 desc=f"{x} SAR -> a_SAR", 3062 debug=debug, 3063 ) 3064 3065 c_SAR_balance -= x 3066 cached_value = self.balance(c_SAR, cached=True) 3067 fresh_value = self.balance(c_SAR, cached=False) 3068 if debug: 3069 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3070 c_SAR_balance) 3071 assert cached_value == c_SAR_balance 3072 assert fresh_value == c_SAR_balance 3073 3074 a_SAR_balance += x 3075 cached_value = self.balance(a_SAR, cached=True) 3076 fresh_value = self.balance(a_SAR, cached=False) 3077 if debug: 3078 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3079 a_SAR_balance) 3080 assert cached_value == a_SAR_balance 3081 assert fresh_value == a_SAR_balance 3082 i += 1 3083 3084 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3085 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3086 3087 # check & zakat with exchange rates for many cycles 3088 3089 for rate, values in { 3090 1: { 3091 'in': [1000, 2000, 10000], 3092 'exchanged': [100000, 200000, 1000000], 3093 'out': [2500, 5000, 73140], 3094 }, 3095 3.75: { 3096 'in': [200, 1000, 5000], 3097 'exchanged': [75000, 375000, 1875000], 3098 'out': [1875, 9375, 137138], 3099 }, 3100 }.items(): 3101 a, b, c = values['in'] 3102 m, n, o = values['exchanged'] 3103 x, y, z = values['out'] 3104 if debug: 3105 print('rate', rate, 'values', values) 3106 for case in [ 3107 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3108 {'safe': {0: {'below_nisab': x}}}, 3109 ], False, m), 3110 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3111 {'safe': {0: {'count': 1, 'total': y}}}, 3112 ], True, n), 3113 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3114 {'cave': {0: {'count': 3, 'total': z}}}, 3115 ], True, o), 3116 ]: 3117 if debug: 3118 print(f"############# check(rate: {rate}) #############") 3119 print('case', case) 3120 self.reset() 3121 self.exchange(account=case[1], created=case[2], rate=rate) 3122 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3123 assert self.snapshot() 3124 3125 # assert self.nolock() 3126 # history_size = len(self._vault['history']) 3127 # print('history_size', history_size) 3128 # assert history_size == 2 3129 assert self.lock() 3130 assert not self.nolock() 3131 report = self.check(2.17, None, debug) 3132 (valid, brief, plan) = report 3133 if debug: 3134 print('brief', brief) 3135 assert valid == case[4] 3136 assert case[5] == brief[0] 3137 assert case[5] == brief[1] 3138 3139 if debug: 3140 pp().pprint(plan) 3141 3142 for x in plan: 3143 assert case[1] == x 3144 if 'total' in case[3][0][x][0].keys(): 3145 assert case[3][0][x][0]['total'] == int(brief[2]) 3146 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3147 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3148 else: 3149 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3150 if debug: 3151 pp().pprint(report) 3152 result = self.zakat(report, debug=debug) 3153 if debug: 3154 print('zakat-result', result, case[4]) 3155 assert result == case[4] 3156 report = self.check(2.17, None, debug) 3157 (valid, brief, plan) = report 3158 assert valid is False 3159 3160 history_size = len(self._vault['history']) 3161 if debug: 3162 print('history_size', history_size) 3163 assert history_size == 3 3164 assert not self.nolock() 3165 assert self.recall(False, debug) is False 3166 self.free(self.lock()) 3167 assert self.nolock() 3168 3169 for i in range(3, 0, -1): 3170 history_size = len(self._vault['history']) 3171 if debug: 3172 print('history_size', history_size) 3173 assert history_size == i 3174 assert self.recall(False, debug) is True 3175 3176 assert self.nolock() 3177 assert self.recall(False, debug) is False 3178 3179 history_size = len(self._vault['history']) 3180 if debug: 3181 print('history_size', history_size) 3182 assert history_size == 0 3183 3184 account_size = len(self._vault['account']) 3185 if debug: 3186 print('account_size', account_size) 3187 assert account_size == 0 3188 3189 report_size = len(self._vault['report']) 3190 if debug: 3191 print('report_size', report_size) 3192 assert report_size == 0 3193 3194 assert self.nolock() 3195 return True 3196 except Exception as e: 3197 # pp().pprint(self._vault) 3198 assert self.export_json("test-snapshot.json") 3199 assert self.save(f"test-snapshot.{self.ext()}") 3200 raise e
3203def test(debug: bool = False): 3204 ledger = ZakatTracker("./zakat_test_db/zakat.camel") 3205 start = ZakatTracker.time() 3206 assert ledger.test(debug=debug) 3207 if debug: 3208 print("#########################") 3209 print("######## TEST DONE ########") 3210 print("#########################") 3211 print(ZakatTracker.duration_from_nanoseconds(ZakatTracker.time() - start)) 3212 print("#########################")
76class Action(Enum): 77 CREATE = auto() 78 TRACK = auto() 79 LOG = auto() 80 SUB = auto() 81 ADD_FILE = auto() 82 REMOVE_FILE = auto() 83 BOX_TRANSFER = auto() 84 EXCHANGE = auto() 85 REPORT = auto() 86 ZAKAT = auto()
89class JSONEncoder(json.JSONEncoder): 90 def default(self, obj): 91 if isinstance(obj, Action) or isinstance(obj, MathOperation): 92 return obj.name # Serialize as the enum member's name 93 elif isinstance(obj, Decimal): 94 return float(obj) 95 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
90 def default(self, obj): 91 if isinstance(obj, Action) or isinstance(obj, MathOperation): 92 return obj.name # Serialize as the enum member's name 93 elif isinstance(obj, Decimal): 94 return float(obj) 95 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
55def start_file_server(database_path: str, database_callback: callable = None, csv_callback: callable = None, 56 debug: bool = False) -> tuple: 57 """ 58 Starts a multi-purpose HTTP server to manage file interactions for a Zakat application. 59 60 This server facilitates the following functionalities: 61 62 1. GET /{file_uuid}/get: Download the database file specified by `database_path`. 63 2. GET /{file_uuid}/upload: Display an HTML form for uploading files. 64 3. POST /{file_uuid}/upload: Handle file uploads, distinguishing between: 65 - Database File (.db): Replaces the existing database with the uploaded one. 66 - CSV File (.csv): Imports data from the CSV into the existing database. 67 68 Args: 69 database_path (str): The path to the pickle database file. 70 database_callback (callable, optional): A function to call after a successful database upload. 71 It receives the uploaded database path as its argument. 72 csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, 73 the database path, and the debug flag as its arguments. 74 debug (bool, optional): If True, print debugging information. Defaults to False. 75 76 Returns: 77 Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: 78 - file_name (str): The name of the database file. 79 - download_url (str): The URL to download the database file. 80 - upload_url (str): The URL to access the file upload form. 81 - server_thread (threading.Thread): The thread running the server. 82 - shutdown_server (Callable[[], None]): A function to gracefully shut down the server. 83 84 Example: 85 _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") 86 print(f"Download database: {download_url}") 87 print(f"Upload files: {upload_url}") 88 server_thread.start() 89 # ... later ... 90 shutdown_server() 91 """ 92 file_uuid = uuid.uuid4() 93 file_name = os.path.basename(database_path) 94 95 port = find_available_port() 96 download_url = f"http://localhost:{port}/{file_uuid}/get" 97 upload_url = f"http://localhost:{port}/{file_uuid}/upload" 98 99 class Handler(http.server.SimpleHTTPRequestHandler): 100 def do_GET(self): 101 if self.path == f"/{file_uuid}/get": 102 # GET: Serve the existing file 103 try: 104 with open(database_path, "rb") as f: 105 self.send_response(200) 106 self.send_header("Content-type", "application/octet-stream") 107 self.send_header("Content-Disposition", f'attachment; filename="{file_name}"') 108 self.end_headers() 109 self.wfile.write(f.read()) 110 except FileNotFoundError: 111 self.send_error(404, "File not found") 112 elif self.path == f"/{file_uuid}/upload": 113 # GET: Serve the upload form 114 self.send_response(200) 115 self.send_header("Content-type", "text/html") 116 self.end_headers() 117 self.wfile.write(f""" 118 <html lang="en"> 119 <head> 120 <title>Zakat File Server</title> 121 </head> 122 <body> 123 <h1>Zakat File Server</h1> 124 <h3>You can download the <a target="__blank" href="{download_url}">database file</a>...</h3> 125 <h3>Or upload a new file to restore a database or import `CSV` file:</h3> 126 <form action="/{file_uuid}/upload" method="post" enctype="multipart/form-data"> 127 <input type="file" name="file" required><br/> 128 <input type="radio" id="{FileType.Database.value}" name="upload_type" value="{FileType.Database.value}" required> 129 <label for="database">Database File</label><br/> 130 <input type="radio"id="{FileType.CSV.value}" name="upload_type" value="{FileType.CSV.value}"> 131 <label for="csv">CSV File</label><br/> 132 <input type="submit" value="Upload"><br/> 133 </form> 134 </body></html> 135 """.encode()) 136 else: 137 self.send_error(404) 138 139 def do_POST(self): 140 if self.path == f"/{file_uuid}/upload": 141 # POST: Handle request 142 # 1. Get the Form Data 143 form_data = cgi.FieldStorage( 144 fp=self.rfile, 145 headers=self.headers, 146 environ={'REQUEST_METHOD': 'POST'} 147 ) 148 upload_type = form_data.getvalue("upload_type") 149 150 if debug: 151 print('upload_type', upload_type) 152 153 if upload_type not in [FileType.Database.value, FileType.CSV.value]: 154 self.send_error(400, "Invalid upload type") 155 return 156 157 # 2. Extract File Data 158 file_item = form_data['file'] # Assuming 'file' is your file input name 159 160 # 3. Get File Details 161 filename = file_item.filename 162 file_data = file_item.file.read() # Read the file's content 163 164 if debug: 165 print(f'Uploaded filename: {filename}') 166 167 # 4. Define Storage Path for CSV 168 upload_directory = "./uploads" # Create this directory if it doesn't exist 169 os.makedirs(upload_directory, exist_ok=True) 170 file_path = os.path.join(upload_directory, upload_type) 171 172 # 5. Write to Disk 173 with open(file_path, 'wb') as f: 174 f.write(file_data) 175 176 match upload_type: 177 case FileType.Database.value: 178 179 try: 180 # 6. Verify database file 181 # ZakatTracker(db_path=file_path) # FATAL, Circular Imports Error 182 if database_callback is not None: 183 database_callback(file_path) 184 185 # 7. Copy database into the original path 186 shutil.copy2(file_path, database_path) 187 except Exception as e: 188 self.send_error(400, str(e)) 189 return 190 191 case FileType.CSV.value: 192 # 6. Verify CSV file 193 try: 194 # x = ZakatTracker(db_path=database_path) # FATAL, Circular Imports Error 195 # result = x.import_csv(file_path, debug=debug) 196 if csv_callback is not None: 197 result = csv_callback(file_path, database_path, debug) 198 if debug: 199 print(f'CSV imported: {result}') 200 if len(result[2]) != 0: 201 self.send_response(200) 202 self.end_headers() 203 self.wfile.write(json.dumps(result).encode()) 204 return 205 except Exception as e: 206 self.send_error(400, str(e)) 207 return 208 209 self.send_response(200) 210 self.end_headers() 211 self.wfile.write(b"File uploaded successfully.") 212 213 httpd = socketserver.TCPServer(("localhost", port), Handler) 214 server_thread = threading.Thread(target=httpd.serve_forever) 215 216 def shutdown_server(): 217 nonlocal httpd, server_thread 218 httpd.shutdown() 219 httpd.server_close() # Close the socket 220 server_thread.join() # Wait for the thread to finish 221 222 return file_name, download_url, upload_url, server_thread, shutdown_server
Starts a multi-purpose HTTP server to manage file interactions for a Zakat application.
This server facilitates the following functionalities:
- GET /{file_uuid}/get: Download the database file specified by
database_path
. - GET /{file_uuid}/upload: Display an HTML form for uploading files.
- POST /{file_uuid}/upload: Handle file uploads, distinguishing between:
- Database File (.db): Replaces the existing database with the uploaded one.
- CSV File (.csv): Imports data from the CSV into the existing database.
Args: database_path (str): The path to the pickle database file. database_callback (callable, optional): A function to call after a successful database upload. It receives the uploaded database path as its argument. csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, the database path, and the debug flag as its arguments. debug (bool, optional): If True, print debugging information. Defaults to False.
Returns: Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: - file_name (str): The name of the database file. - download_url (str): The URL to download the database file. - upload_url (str): The URL to access the file upload form. - server_thread (threading.Thread): The thread running the server. - shutdown_server (Callable[[], None]): A function to gracefully shut down the server.
Example: _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") print(f"Download database: {download_url}") print(f"Upload files: {upload_url}") server_thread.start() # ... later ... shutdown_server()
32def find_available_port() -> int: 33 """ 34 Finds and returns an available TCP port on the local machine. 35 36 This function utilizes a TCP server socket to bind to port 0, which 37 instructs the operating system to automatically assign an available 38 port. The assigned port is then extracted and returned. 39 40 Returns: 41 int: The available TCP port number. 42 43 Raises: 44 OSError: If an error occurs during the port binding process, such 45 as all ports being in use. 46 47 Example: 48 port = find_available_port() 49 print(f"Available port: {port}") 50 """ 51 with socketserver.TCPServer(("localhost", 0), None) as s: 52 return s.server_address[1]
Finds and returns an available TCP port on the local machine.
This function utilizes a TCP server socket to bind to port 0, which instructs the operating system to automatically assign an available port. The assigned port is then extracted and returned.
Returns: int: The available TCP port number.
Raises: OSError: If an error occurs during the port binding process, such as all ports being in use.
Example: port = find_available_port() print(f"Available port: {port}")