zakat
xxx
_____ _ _ _ _ _
|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _
/ // _| |/ / _
| __| | | | | '_ \| '__/ _` | '__| | | |
/ /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| |
/______,_|_|___,_|__| |_____|_|_.__/|_| __,_|_| __, |
|___/
"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف ... Never Trust, Always Verify ...
This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat.
1""" 2 _____ _ _ _ _ _ 3|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _ 4 / // _` | |/ / _` | __| | | | | '_ \| '__/ _` | '__| | | | 5 / /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| | 6/____\__,_|_|\_\__,_|\__| |_____|_|_.__/|_| \__,_|_| \__, | 7 |___/ 8 9"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف 10... Never Trust, Always Verify ... 11 12This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat. 13""" 14# Importing necessary classes and functions from the main module 15from zakat.zakat_tracker import ( 16 ZakatTracker, 17 test, 18 Action, 19 JSONEncoder, 20 MathOperation, 21) 22 23from zakat.file_server import ( 24 start_file_server, 25 find_available_port, 26 FileType, 27) 28 29# Version information for the module 30__version__ = ZakatTracker.Version() 31__all__ = [ 32 "ZakatTracker", 33 "test", 34 "Action", 35 "JSONEncoder", 36 "MathOperation", 37 "start_file_server", 38 "find_available_port", 39 "FileType", 40]
122class ZakatTracker: 123 """ 124 A class for tracking and calculating Zakat. 125 126 This class provides functionalities for recording transactions, calculating Zakat due, 127 and managing account balances. It also offers features like importing transactions from 128 CSV files, exporting data to JSON format, and saving/loading the tracker state. 129 130 The `ZakatTracker` class is designed to handle both positive and negative transactions, 131 allowing for flexible tracking of financial activities related to Zakat. It also supports 132 the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due 133 based on the current silver price. 134 135 The class uses a camel file as its database to persist the tracker state, 136 ensuring data integrity across sessions. It also provides options for enabling or 137 disabling history tracking, allowing users to choose their preferred level of detail. 138 139 In addition, the `ZakatTracker` class includes various helper methods like 140 `time`, `time_to_datetime`, `lock`, `free`, `recall`, `export_json`, 141 and more. These methods provide additional functionalities and flexibility 142 for interacting with and managing the Zakat tracker. 143 144 Attributes: 145 ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. 146 ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. 147 ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. 148 ZakatTracker.Version (function): The version of the ZakatTracker class. 149 150 Data Structure: 151 The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data. 152 153 _vault (dict): 154 - account (dict): 155 - {account_number} (dict): 156 - balance (int): The current balance of the account. 157 - box (dict): A dictionary storing transaction details. 158 - {timestamp} (dict): 159 - capital (int): The initial amount of the transaction. 160 - count (int): The number of times Zakat has been calculated for this transaction. 161 - last (int): The timestamp of the last Zakat calculation. 162 - rest (int): The remaining amount after Zakat deductions and withdrawal. 163 - total (int): The total Zakat deducted from this transaction. 164 - count (int): The total number of transactions for the account. 165 - log (dict): A dictionary storing transaction logs. 166 - {timestamp} (dict): 167 - value (int): The transaction amount (positive or negative). 168 - desc (str): The description of the transaction. 169 - ref (int): The box reference (positive or None). 170 - file (dict): A dictionary storing file references associated with the transaction. 171 - hide (bool): Indicates whether the account is hidden or not. 172 - zakatable (bool): Indicates whether the account is subject to Zakat. 173 - exchange (dict): 174 - account (dict): 175 - {timestamps} (dict): 176 - rate (float): Exchange rate when compared to local currency. 177 - description (str): The description of the exchange rate. 178 - history (dict): 179 - {timestamp} (list): A list of dictionaries storing the history of actions performed. 180 - {action_dict} (dict): 181 - action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT). 182 - account (str): The account number associated with the action. 183 - ref (int): The reference number of the transaction. 184 - file (int): The reference number of the file (if applicable). 185 - key (str): The key associated with the action (e.g., 'rest', 'total'). 186 - value (int): The value associated with the action. 187 - math (MathOperation): The mathematical operation performed (if applicable). 188 - lock (int or None): The timestamp indicating the current lock status (None if not locked). 189 - report (dict): 190 - {timestamp} (tuple): A tuple storing Zakat report details. 191 192 """ 193 194 @staticmethod 195 def Version() -> str: 196 """ 197 Returns the current version of the software. 198 199 This function returns a string representing the current version of the software, 200 including major, minor, and patch version numbers in the format "X.Y.Z". 201 202 Returns: 203 str: The current version of the software. 204 """ 205 return '0.2.84' 206 207 @staticmethod 208 def ZakatCut(x: float) -> float: 209 """ 210 Calculates the Zakat amount due on an asset. 211 212 This function calculates the zakat amount due on a given asset value over one lunar year. 213 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 214 that exceeds a certain threshold (Nisab). 215 216 Parameters: 217 x: The total value of the asset on which Zakat is to be calculated. 218 219 Returns: 220 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 221 """ 222 return 0.025 * x # Zakat Cut in one Lunar Year 223 224 @staticmethod 225 def TimeCycle(days: int = 355) -> int: 226 """ 227 Calculates the approximate duration of a lunar year in nanoseconds. 228 229 This function calculates the approximate duration of a lunar year based on the given number of days. 230 It converts the given number of days into nanoseconds for use in high-precision timing applications. 231 232 Parameters: 233 days: The number of days in a lunar year. Defaults to 355, 234 which is an approximation of the average length of a lunar year. 235 236 Returns: 237 The approximate duration of a lunar year in nanoseconds. 238 """ 239 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds 240 241 @staticmethod 242 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 243 """ 244 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 245 246 This function calculates the Nisab value, which is the minimum threshold of wealth, 247 that makes an individual liable for paying Zakat. 248 The Nisab value is determined by the equivalent value of a specific amount 249 of gold or silver (currently 595 grams in silver) in the local currency. 250 251 Parameters: 252 - gram_price (float): The price per gram of Nisab. 253 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 254 255 Returns: 256 - float: The total value of Nisab based on the given price per gram. 257 """ 258 return gram_price * gram_quantity 259 260 @staticmethod 261 def ext() -> str: 262 """ 263 Returns the file extension used by the ZakatTracker class. 264 265 Returns: 266 str: The file extension used by the ZakatTracker class, which is 'camel'. 267 """ 268 return 'camel' 269 270 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 271 """ 272 Initialize ZakatTracker with database path and history mode. 273 274 Parameters: 275 db_path (str): The path to the database file. Default is "zakat.camel". 276 history_mode (bool): The mode for tracking history. Default is True. 277 278 Returns: 279 None 280 """ 281 self._base_path = None 282 self._vault_path = None 283 self._vault = None 284 self.reset() 285 self._history(history_mode) 286 self.path(db_path) 287 288 def path(self, path: str = None) -> str: 289 """ 290 Set or get the path to the database file. 291 292 If no path is provided, the current path is returned. 293 If a path is provided, it is set as the new path. 294 The function also creates the necessary directories if the provided path is a file. 295 296 Parameters: 297 path (str): The new path to the database file. If not provided, the current path is returned. 298 299 Returns: 300 str: The current or new path to the database file. 301 """ 302 if path is None: 303 return self._vault_path 304 self._vault_path = Path(path).resolve() 305 base_path = Path(path).resolve() 306 if base_path.is_file() or base_path.suffix: 307 base_path = base_path.parent 308 base_path.mkdir(parents=True, exist_ok=True) 309 self._base_path = base_path 310 return str(self._vault_path) 311 312 def base_path(self, *args) -> str: 313 """ 314 Generate a base path by joining the provided arguments with the existing base path. 315 316 Parameters: 317 *args (str): Variable length argument list of strings to be joined with the base path. 318 319 Returns: 320 str: The generated base path. If no arguments are provided, the existing base path is returned. 321 """ 322 if not args: 323 return str(self._base_path) 324 filtered_args = [] 325 ignored_filename = None 326 for arg in args: 327 if Path(arg).suffix: 328 ignored_filename = arg 329 else: 330 filtered_args.append(arg) 331 base_path = Path(self._base_path) 332 full_path = base_path.joinpath(*filtered_args) 333 full_path.mkdir(parents=True, exist_ok=True) 334 if ignored_filename is not None: 335 return full_path.resolve() / ignored_filename # Join with the ignored filename 336 return str(full_path.resolve()) 337 338 @staticmethod 339 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 340 """ 341 Scales a numerical value by a specified power of 10, returning an integer. 342 343 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 344 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 345 346 Parameters: 347 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 348 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 349 by a factor of 100 (e.g., converts 1.23 to 123). 350 351 Returns: 352 The scaled value, rounded to the nearest integer. 353 354 Raises: 355 TypeError: If the input `x` is not a valid numeric type. 356 357 Examples: 358 >>> ZakatTracker.scale(3.14159) 359 314 360 >>> ZakatTracker.scale(1234, decimal_places=3) 361 1234000 362 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 363 50 364 """ 365 if not isinstance(x, (float, int, Decimal)): 366 raise TypeError("Input 'x' must be a float, int, or Decimal.") 367 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places)) 368 369 @staticmethod 370 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 371 """ 372 Unscales an integer by a power of 10. 373 374 Parameters: 375 x: The integer to unscale. 376 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 377 decimal_places: The power of 10 to use. Defaults to 2. 378 379 Returns: 380 The unscaled number, converted to the specified return_type. 381 382 Raises: 383 TypeError: If the return_type is not float or Decimal. 384 """ 385 if return_type not in (float, Decimal): 386 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 387 return round(return_type(x / (10 ** decimal_places)), decimal_places) 388 389 def _history(self, status: bool = None) -> bool: 390 """ 391 Enable or disable history tracking. 392 393 Parameters: 394 status (bool): The status of history tracking. Default is True. 395 396 Returns: 397 None 398 """ 399 if status is not None: 400 self._history_mode = status 401 return self._history_mode 402 403 def reset(self) -> None: 404 """ 405 Reset the internal data structure to its initial state. 406 407 Parameters: 408 None 409 410 Returns: 411 None 412 """ 413 self._vault = { 414 'account': {}, 415 'exchange': {}, 416 'history': {}, 417 'lock': None, 418 'report': {}, 419 } 420 421 @staticmethod 422 def time(now: datetime = None) -> int: 423 """ 424 Generates a timestamp based on the provided datetime object or the current datetime. 425 426 Parameters: 427 now (datetime, optional): The datetime object to generate the timestamp from. 428 If not provided, the current datetime is used. 429 430 Returns: 431 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 432 before 1970 will return in negative until 1000AD. 433 """ 434 if now is None: 435 now = datetime.datetime.now() 436 ordinal_day = now.toordinal() 437 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 438 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day) 439 440 @staticmethod 441 def time_to_datetime(ordinal_ns: int) -> datetime: 442 """ 443 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 444 445 Parameters: 446 ordinal_ns (int): The ordinal number of days since 1000-01-01. 447 448 Returns: 449 datetime: The corresponding datetime object. 450 """ 451 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 452 ns_in_day = ordinal_ns % 86_400_000_000_000 453 d = datetime.datetime.fromordinal(ordinal_day) 454 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 455 return datetime.datetime.combine(d, datetime.time()) + t 456 457 def clean_history(self, lock: int | None = None) -> int: 458 """ 459 Cleans up the history of actions performed on the ZakatTracker instance. 460 461 Parameters: 462 lock (int, optional): The lock ID is used to clean up the empty history. 463 If not provided, it cleans up the empty history records for all locks. 464 465 Returns: 466 int: The number of locks cleaned up. 467 """ 468 count = 0 469 if lock in self._vault['history']: 470 if len(self._vault['history'][lock]) <= 0: 471 count += 1 472 del self._vault['history'][lock] 473 return count 474 self.free(self.lock()) 475 for lock in self._vault['history']: 476 if len(self._vault['history'][lock]) <= 0: 477 count += 1 478 del self._vault['history'][lock] 479 return count 480 481 def _step(self, action: Action = None, account=None, ref: int = None, file: int = None, value: float = None, 482 key: str = None, math_operation: MathOperation = None) -> int: 483 """ 484 This method is responsible for recording the actions performed on the ZakatTracker. 485 486 Parameters: 487 - action (Action): The type of action performed. 488 - account (str): The account number on which the action was performed. 489 - ref (int): The reference number of the action. 490 - file (int): The file reference number of the action. 491 - value (int): The value associated with the action. 492 - key (str): The key associated with the action. 493 - math_operation (MathOperation): The mathematical operation performed during the action. 494 495 Returns: 496 - int: The lock time of the recorded action. If no lock was performed, it returns 0. 497 """ 498 if not self._history(): 499 return 0 500 lock = self._vault['lock'] 501 if self.nolock(): 502 lock = self._vault['lock'] = self.time() 503 self._vault['history'][lock] = [] 504 if action is None: 505 return lock 506 self._vault['history'][lock].append({ 507 'action': action, 508 'account': account, 509 'ref': ref, 510 'file': file, 511 'key': key, 512 'value': value, 513 'math': math_operation, 514 }) 515 return lock 516 517 def nolock(self) -> bool: 518 """ 519 Check if the vault lock is currently not set. 520 521 Returns: 522 bool: True if the vault lock is not set, False otherwise. 523 """ 524 return self._vault['lock'] is None 525 526 def lock(self) -> int: 527 """ 528 Acquires a lock on the ZakatTracker instance. 529 530 Returns: 531 int: The lock ID. This ID can be used to release the lock later. 532 """ 533 return self._step() 534 535 def vault(self) -> dict: 536 """ 537 Returns a copy of the internal vault dictionary. 538 539 This method is used to retrieve the current state of the ZakatTracker object. 540 It provides a snapshot of the internal data structure, allowing for further 541 processing or analysis. 542 543 Returns: 544 dict: A copy of the internal vault dictionary. 545 """ 546 return self._vault.copy() 547 548 def stats(self) -> dict[str, tuple]: 549 """ 550 Calculates and returns statistics about the object's data storage. 551 552 This method determines the size of the database file on disk and the 553 size of the data currently held in RAM (likely within a dictionary). 554 Both sizes are reported in bytes and in a human-readable format 555 (e.g., KB, MB). 556 557 Returns: 558 dict[str, tuple]: A dictionary containing the following statistics: 559 560 * 'database': A tuple with two elements: 561 - The database file size in bytes (int). 562 - The database file size in human-readable format (str). 563 * 'ram': A tuple with two elements: 564 - The RAM usage (dictionary size) in bytes (int). 565 - The RAM usage in human-readable format (str). 566 567 Example: 568 >>> stats = my_object.stats() 569 >>> print(stats['database']) 570 (256000, '250.0 KB') 571 >>> print(stats['ram']) 572 (12345, '12.1 KB') 573 """ 574 ram_size = self.get_dict_size(self.vault()) 575 file_size = os.path.getsize(self.path()) 576 return { 577 'database': (file_size, self.human_readable_size(file_size)), 578 'ram': (ram_size, self.human_readable_size(ram_size)), 579 } 580 581 def files(self) -> list[dict[str, str | int]]: 582 """ 583 Retrieves information about files associated with this class. 584 585 This class method provides a standardized way to gather details about 586 files used by the class for storage, snapshots, and CSV imports. 587 588 Returns: 589 list[dict[str, str | int]]: A list of dictionaries, each containing information 590 about a specific file: 591 592 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 593 * path (str): The full file path. 594 * exists (bool): Whether the file exists on the filesystem. 595 * size (int): The file size in bytes (0 if the file doesn't exist). 596 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 597 598 Example: 599 ``` 600 file_info = MyClass.files() 601 for info in file_info: 602 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 603 ``` 604 """ 605 result = [] 606 for file_type, path in { 607 'database': self.path(), 608 'snapshot': self.snapshot_cache_path(), 609 'import_csv': self.import_csv_cache_path(), 610 }.items(): 611 exists = os.path.exists(path) 612 size = os.path.getsize(path) if exists else 0 613 human_readable_size = self.human_readable_size(size) if exists else 0 614 result.append({ 615 'type': file_type, 616 'path': path, 617 'exists': exists, 618 'size': size, 619 'human_readable_size': human_readable_size, 620 }) 621 return result 622 623 def steps(self) -> dict: 624 """ 625 Returns a copy of the history of steps taken in the ZakatTracker. 626 627 The history is a dictionary where each key is a unique identifier for a step, 628 and the corresponding value is a dictionary containing information about the step. 629 630 Returns: 631 dict: A copy of the history of steps taken in the ZakatTracker. 632 """ 633 return self._vault['history'].copy() 634 635 def free(self, lock: int, auto_save: bool = True) -> bool: 636 """ 637 Releases the lock on the database. 638 639 Parameters: 640 lock (int): The lock ID to be released. 641 auto_save (bool): Whether to automatically save the database after releasing the lock. 642 643 Returns: 644 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 645 """ 646 if lock == self._vault['lock']: 647 self._vault['lock'] = None 648 self.clean_history(lock) 649 if auto_save: 650 return self.save(self.path()) 651 return True 652 return False 653 654 def account_exists(self, account) -> bool: 655 """ 656 Check if the given account exists in the vault. 657 658 Parameters: 659 account (str): The account number to check. 660 661 Returns: 662 bool: True if the account exists, False otherwise. 663 """ 664 return account in self._vault['account'] 665 666 def box_size(self, account) -> int: 667 """ 668 Calculate the size of the box for a specific account. 669 670 Parameters: 671 account (str): The account number for which the box size needs to be calculated. 672 673 Returns: 674 int: The size of the box for the given account. If the account does not exist, -1 is returned. 675 """ 676 if self.account_exists(account): 677 return len(self._vault['account'][account]['box']) 678 return -1 679 680 def log_size(self, account) -> int: 681 """ 682 Get the size of the log for a specific account. 683 684 Parameters: 685 account (str): The account number for which the log size needs to be calculated. 686 687 Returns: 688 int: The size of the log for the given account. If the account does not exist, -1 is returned. 689 """ 690 if self.account_exists(account): 691 return len(self._vault['account'][account]['log']) 692 return -1 693 694 @staticmethod 695 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 696 """ 697 Calculates the hash of a file using the specified algorithm. 698 699 Parameters: 700 file_path (str): The path to the file. 701 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 702 703 Returns: 704 str: The hexadecimal representation of the file's hash. 705 """ 706 hash_obj = hashlib.new(algorithm) # Create the hash object 707 with open(file_path, "rb") as f: # Open file in binary mode for reading 708 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 709 hash_obj.update(chunk) 710 return hash_obj.hexdigest() # Return the hash as a hexadecimal string 711 712 def snapshot_cache_path(self): 713 """ 714 Generate the path for the cache file used to store snapshots. 715 716 The cache file is a camel file that stores the timestamps of the snapshots. 717 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 718 719 Returns: 720 str: The path to the cache file. 721 """ 722 path = str(self.path()) 723 ext = self.ext() 724 ext_len = len(ext) 725 if path.endswith(f'.{ext}'): 726 path = path[:-ext_len-1] 727 _, filename = os.path.split(path + f'.snapshots.{ext}') 728 return self.base_path(filename) 729 730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True 761 762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result 793 794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True 941 942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False 957 958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref) 970 971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created 1036 1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref) 1049 1050 def _log(self, value: float, desc: str = '', account: str = 1, created: int = None, ref: int = None, 1051 debug: bool = False) -> int: 1052 """ 1053 Log a transaction into the account's log. 1054 1055 Parameters: 1056 value (float): The value of the transaction. 1057 desc (str): The description of the transaction. 1058 account (str): The account to log the transaction into. Default is '1'. 1059 created (int): The timestamp of the transaction. If not provided, it will be generated. 1060 1061 Returns: 1062 int: The timestamp of the logged transaction. 1063 1064 This method updates the account's balance, count, and log with the transaction details. 1065 It also creates a step in the history of the transaction. 1066 1067 Raises: 1068 ValueError: The log transaction happened again in the same nanosecond time. 1069 """ 1070 if debug: 1071 print('_log', f'debug={debug}') 1072 if created is None: 1073 created = self.time() 1074 try: 1075 self._vault['account'][account]['balance'] += value 1076 except TypeError: 1077 self._vault['account'][account]['balance'] += Decimal(value) 1078 self._vault['account'][account]['count'] += 1 1079 if debug: 1080 print('create-log', created) 1081 if self.log_exists(account, created): 1082 raise ValueError(f"The log transaction happened again in the same nanosecond time({created}).") 1083 if debug: 1084 print('created-log', created) 1085 self._vault['account'][account]['log'][created] = { 1086 'value': value, 1087 'desc': desc, 1088 'ref': ref, 1089 'file': {}, 1090 } 1091 self._step(Action.LOG, account, ref=created, value=value) 1092 return created 1093 1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ 1144 1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate 1159 1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy() 1173 1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result 1188 1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {} 1203 1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {} 1218 1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0 1242 1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False 1267 1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1] 1287 1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False 1320 1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False 1353 1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages 1433 1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times 1508 1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan 1621 1622 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 demand (float): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': float, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': demand, 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts 1656 1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0 1707 1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1779 if no_lock: 1780 self.free(self.lock()) 1781 return True 1782 1783 def export_json(self, path: str = "data.json") -> bool: 1784 """ 1785 Exports the current state of the ZakatTracker object to a JSON file. 1786 1787 Parameters: 1788 path (str): The path where the JSON file will be saved. Default is "data.json". 1789 1790 Returns: 1791 bool: True if the export is successful, False otherwise. 1792 1793 Raises: 1794 No specific exceptions are raised by this method. 1795 """ 1796 with open(path, "w") as file: 1797 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1798 return True 1799 1800 def save(self, path: str = None) -> bool: 1801 """ 1802 Saves the ZakatTracker's current state to a camel file. 1803 1804 This method serializes the internal data (`_vault`). 1805 1806 Parameters: 1807 path (str, optional): File path for saving. Defaults to a predefined location. 1808 1809 Returns: 1810 bool: True if the save operation is successful, False otherwise. 1811 """ 1812 if path is None: 1813 path = self.path() 1814 with open(f'{path}.tmp', 'w') as stream: 1815 # first save in tmp file 1816 stream.write(camel.dump(self._vault)) 1817 # then move tmp file to original location 1818 shutil.move(f'{path}.tmp', path) 1819 return True 1820 1821 def load(self, path: str = None) -> bool: 1822 """ 1823 Load the current state of the ZakatTracker object from a camel file. 1824 1825 Parameters: 1826 path (str): The path where the camel file is located. If not provided, it will use the default path. 1827 1828 Returns: 1829 bool: True if the load operation is successful, False otherwise. 1830 """ 1831 if path is None: 1832 path = self.path() 1833 if os.path.exists(path): 1834 with open(path, 'r') as stream: 1835 self._vault = camel.load(stream.read()) 1836 return True 1837 return False 1838 1839 def import_csv_cache_path(self): 1840 """ 1841 Generates the cache file path for imported CSV data. 1842 1843 This function constructs the file path where cached data from CSV imports 1844 will be stored. The cache file is a camel file (.camel extension) appended 1845 to the base path of the object. 1846 1847 Returns: 1848 str: The full path to the import CSV cache file. 1849 1850 Example: 1851 >>> obj = ZakatTracker('/data/reports') 1852 >>> obj.import_csv_cache_path() 1853 '/data/reports.import_csv.camel' 1854 """ 1855 path = str(self.path()) 1856 ext = self.ext() 1857 ext_len = len(ext) 1858 if path.endswith(f'.{ext}'): 1859 path = path[:-ext_len-1] 1860 _, filename = os.path.split(path + f'.import_csv.{ext}') 1861 return self.base_path(filename) 1862 1863 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1864 """ 1865 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1866 1867 Parameters: 1868 path (str): The path to the CSV file. Default is 'file.csv'. 1869 debug (bool): A flag indicating whether to print debug information. 1870 1871 Returns: 1872 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1873 and a dictionary of bad transactions. 1874 1875 Notes: 1876 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1877 are appropriate for the currency pairs involved in the conversions. 1878 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1879 to 1.0 or the previous rate for that account. 1880 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1881 transactions of the same account within the whole imported and existing dataset when doing `check` and 1882 `zakat` operations. 1883 1884 Example Usage: 1885 The CSV file should have the following format, rate is optional per transaction: 1886 account, desc, value, date, rate 1887 For example: 1888 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1889 """ 1890 if debug: 1891 print('import_csv', f'debug={debug}') 1892 cache: list[int] = [] 1893 try: 1894 with open(self.import_csv_cache_path(), 'r') as stream: 1895 cache = camel.load(stream.read()) 1896 except: 1897 pass 1898 date_formats = [ 1899 "%Y-%m-%d %H:%M:%S", 1900 "%Y-%m-%dT%H:%M:%S", 1901 "%Y-%m-%dT%H%M%S", 1902 "%Y-%m-%d", 1903 ] 1904 created, found, bad = 0, 0, {} 1905 data: dict[int, list] = {} 1906 with open(path, newline='', encoding="utf-8") as f: 1907 i = 0 1908 for row in csv.reader(f, delimiter=','): 1909 i += 1 1910 hashed = hash(tuple(row)) 1911 if hashed in cache: 1912 found += 1 1913 continue 1914 account = row[0] 1915 desc = row[1] 1916 value = float(row[2]) 1917 rate = 1.0 1918 if row[4:5]: # Empty list if index is out of range 1919 rate = float(row[4]) 1920 date: int = 0 1921 for time_format in date_formats: 1922 try: 1923 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1924 break 1925 except: 1926 pass 1927 # TODO: not allowed for negative dates in the future after enhance time functions 1928 if date == 0 or value == 0: 1929 bad[i] = row + ('invalid date',) 1930 continue 1931 if date not in data: 1932 data[date] = [] 1933 data[date].append((i, account, desc, value, date, rate, hashed)) 1934 1935 if debug: 1936 print('import_csv', len(data)) 1937 1938 if bad: 1939 return created, found, bad 1940 1941 for date, rows in sorted(data.items()): 1942 try: 1943 len_rows = len(rows) 1944 if len_rows == 1: 1945 (_, account, desc, value, date, rate, hashed) = rows[0] 1946 if rate > 0: 1947 self.exchange(account, created=date, rate=rate) 1948 if value > 0: 1949 self.track(value, desc, account, True, date) 1950 elif value < 0: 1951 self.sub(-value, desc, account, date) 1952 created += 1 1953 cache.append(hashed) 1954 continue 1955 if debug: 1956 print('-- Duplicated time detected', date, 'len', len_rows) 1957 print(rows) 1958 print('---------------------------------') 1959 # If records are found at the same time with different accounts in the same amount 1960 # (one positive and the other negative), this indicates it is a transfer. 1961 if len_rows != 2: 1962 raise Exception(f'more than two transactions({len_rows}) at the same time') 1963 (i, account1, desc1, value1, date1, rate1, _) = rows[0] 1964 (j, account2, desc2, value2, date2, rate2, _) = rows[1] 1965 if account1 == account2 or desc1 != desc2 or abs(value1) != abs(value2) or date1 != date2: 1966 raise Exception('invalid transfer') 1967 if rate1 > 0: 1968 self.exchange(account1, created=date1, rate=rate1) 1969 if rate2 > 0: 1970 self.exchange(account2, created=date2, rate=rate2) 1971 values = { 1972 value1: account1, 1973 value2: account2, 1974 } 1975 self.transfer( 1976 unscaled_amount=abs(value1), 1977 from_account=values[min(values.keys())], 1978 to_account=values[max(values.keys())], 1979 desc=desc1, 1980 created=date1, 1981 ) 1982 except Exception as e: 1983 for (i, account, desc, value, date, rate, _) in rows: 1984 bad[i] = (account, desc, value, date, rate, e) 1985 break 1986 with open(self.import_csv_cache_path(), 'w') as stream: 1987 stream.write(camel.dump(cache)) 1988 return created, found, bad 1989 1990 ######## 1991 # TESTS # 1992 ####### 1993 1994 @staticmethod 1995 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1996 """ 1997 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 1998 1999 This function iterates through progressively larger units of information 2000 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2001 range that can be expressed with a reasonable number before the unit. 2002 2003 Parameters: 2004 size (float): The size in bytes to convert. 2005 decimal_places (int, optional): The number of decimal places to display 2006 in the result. Defaults to 2. 2007 2008 Returns: 2009 str: A string representation of the size in a human-readable format, 2010 rounded to the specified number of decimal places. For example: 2011 - "1.50 KB" (1536 bytes) 2012 - "23.00 MB" (24117248 bytes) 2013 - "1.23 GB" (1325899906 bytes) 2014 """ 2015 if type(size) not in (float, int): 2016 raise TypeError("size must be a float or integer") 2017 if type(decimal_places) != int: 2018 raise TypeError("decimal_places must be an integer") 2019 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2020 if size < 1024.0: 2021 break 2022 size /= 1024.0 2023 return f"{size:.{decimal_places}f} {unit}" 2024 2025 @staticmethod 2026 def get_dict_size(obj: dict, seen: set = None) -> float: 2027 """ 2028 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2029 2030 This function traverses the dictionary structure, accounting for the size of keys, values, 2031 and any nested objects. It handles various data types commonly found in dictionaries 2032 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2033 of circular references. 2034 2035 Parameters: 2036 obj (dict): The dictionary whose size is to be calculated. 2037 seen (set, optional): A set used internally to track visited objects 2038 and avoid circular references. Defaults to None. 2039 2040 Returns: 2041 float: An approximate size of the dictionary and its contents in bytes. 2042 2043 Note: 2044 - This function is a method of the `ZakatTracker` class and is likely used to 2045 estimate the memory footprint of data structures relevant to Zakat calculations. 2046 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2047 not account for all memory overhead depending on the Python implementation. 2048 - Circular references are handled to prevent infinite recursion. 2049 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2050 - String sizes are estimated based on character length and encoding. 2051 """ 2052 size = 0 2053 if seen is None: 2054 seen = set() 2055 2056 obj_id = id(obj) 2057 if obj_id in seen: 2058 return 0 2059 2060 seen.add(obj_id) 2061 size += sys.getsizeof(obj) 2062 2063 if isinstance(obj, dict): 2064 for k, v in obj.items(): 2065 size += ZakatTracker.get_dict_size(k, seen) 2066 size += ZakatTracker.get_dict_size(v, seen) 2067 elif isinstance(obj, (list, tuple, set, frozenset)): 2068 for item in obj: 2069 size += ZakatTracker.get_dict_size(item, seen) 2070 elif isinstance(obj, (int, float, complex)): # Handle numbers 2071 pass # Basic numbers have a fixed size, so nothing to add here 2072 elif isinstance(obj, str): # Handle strings 2073 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2074 return size 2075 2076 @staticmethod 2077 def duration_from_nanoseconds(ns: int, 2078 show_zeros_in_spoken_time: bool = False, 2079 spoken_time_separator=',', 2080 millennia: str = 'Millennia', 2081 century: str = 'Century', 2082 years: str = 'Years', 2083 days: str = 'Days', 2084 hours: str = 'Hours', 2085 minutes: str = 'Minutes', 2086 seconds: str = 'Seconds', 2087 milli_seconds: str = 'MilliSeconds', 2088 micro_seconds: str = 'MicroSeconds', 2089 nano_seconds: str = 'NanoSeconds', 2090 ) -> tuple: 2091 """ 2092 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2093 Convert NanoSeconds to Human Readable Time Format. 2094 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2095 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2096 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2097 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2098 2099 INPUT : ms (AKA: MilliSeconds) 2100 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2101 OUTPUT Variables: time_lapsed, spoken_time 2102 2103 Example Input: duration_from_nanoseconds(ns) 2104 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2105 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2106 duration_from_nanoseconds(1234567890123456789012) 2107 """ 2108 us, ns = divmod(ns, 1000) 2109 ms, us = divmod(us, 1000) 2110 s, ms = divmod(ms, 1000) 2111 m, s = divmod(s, 60) 2112 h, m = divmod(m, 60) 2113 d, h = divmod(h, 24) 2114 y, d = divmod(d, 365) 2115 c, y = divmod(y, 100) 2116 n, c = divmod(c, 10) 2117 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2118 spoken_time_part = [] 2119 if n > 0 or show_zeros_in_spoken_time: 2120 spoken_time_part.append(f"{n: 3d} {millennia}") 2121 if c > 0 or show_zeros_in_spoken_time: 2122 spoken_time_part.append(f"{c: 4d} {century}") 2123 if y > 0 or show_zeros_in_spoken_time: 2124 spoken_time_part.append(f"{y: 3d} {years}") 2125 if d > 0 or show_zeros_in_spoken_time: 2126 spoken_time_part.append(f"{d: 4d} {days}") 2127 if h > 0 or show_zeros_in_spoken_time: 2128 spoken_time_part.append(f"{h: 2d} {hours}") 2129 if m > 0 or show_zeros_in_spoken_time: 2130 spoken_time_part.append(f"{m: 2d} {minutes}") 2131 if s > 0 or show_zeros_in_spoken_time: 2132 spoken_time_part.append(f"{s: 2d} {seconds}") 2133 if ms > 0 or show_zeros_in_spoken_time: 2134 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2135 if us > 0 or show_zeros_in_spoken_time: 2136 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2137 if ns > 0 or show_zeros_in_spoken_time: 2138 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2139 return time_lapsed, spoken_time_separator.join(spoken_time_part) 2140 2141 @staticmethod 2142 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2143 """ 2144 Convert a specific day, month, and year into a timestamp. 2145 2146 Parameters: 2147 day (int): The day of the month. 2148 month (int): The month of the year. Default is 6 (June). 2149 year (int): The year. Default is 2024. 2150 2151 Returns: 2152 int: The timestamp representing the given day, month, and year. 2153 2154 Note: 2155 This method assumes the default month and year if not provided. 2156 """ 2157 return ZakatTracker.time(datetime.datetime(year, month, day)) 2158 2159 @staticmethod 2160 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2161 """ 2162 Generate a random date between two given dates. 2163 2164 Parameters: 2165 start_date (datetime.datetime): The start date from which to generate a random date. 2166 end_date (datetime.datetime): The end date until which to generate a random date. 2167 2168 Returns: 2169 datetime.datetime: A random date between the start_date and end_date. 2170 """ 2171 time_between_dates = end_date - start_date 2172 days_between_dates = time_between_dates.days 2173 random_number_of_days = random.randrange(days_between_dates) 2174 return start_date + datetime.timedelta(days=random_number_of_days) 2175 2176 @staticmethod 2177 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2178 debug: bool = False) -> int: 2179 """ 2180 Generate a random CSV file with specified parameters. 2181 2182 Parameters: 2183 path (str): The path where the CSV file will be saved. Default is "data.csv". 2184 count (int): The number of rows to generate in the CSV file. Default is 1000. 2185 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2186 debug (bool): A flag indicating whether to print debug information. 2187 2188 Returns: 2189 None. The function generates a CSV file at the specified path with the given count of rows. 2190 Each row contains a randomly generated account, description, value, and date. 2191 The value is randomly generated between 1000 and 100000, 2192 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2193 If the row number is not divisible by 13, the value is multiplied by -1. 2194 """ 2195 if debug: 2196 print('generate_random_csv_file', f'debug={debug}') 2197 i = 0 2198 with open(path, "w", newline="") as csvfile: 2199 writer = csv.writer(csvfile) 2200 for i in range(count): 2201 account = f"acc-{random.randint(1, 1000)}" 2202 desc = f"Some text {random.randint(1, 1000)}" 2203 value = random.randint(1000, 100000) 2204 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2205 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2206 if not i % 13 == 0: 2207 value *= -1 2208 row = [account, desc, value, date] 2209 if with_rate: 2210 rate = random.randint(1, 100) * 0.12 2211 if debug: 2212 print('before-append', row) 2213 row.append(rate) 2214 if debug: 2215 print('after-append', row) 2216 writer.writerow(row) 2217 i = i + 1 2218 return i 2219 2220 @staticmethod 2221 def create_random_list(max_sum, min_value=0, max_value=10): 2222 """ 2223 Creates a list of random integers whose sum does not exceed the specified maximum. 2224 2225 Args: 2226 max_sum: The maximum allowed sum of the list elements. 2227 min_value: The minimum possible value for an element (inclusive). 2228 max_value: The maximum possible value for an element (inclusive). 2229 2230 Returns: 2231 A list of random integers. 2232 """ 2233 result = [] 2234 current_sum = 0 2235 2236 while current_sum < max_sum: 2237 # Calculate the remaining space for the next element 2238 remaining_sum = max_sum - current_sum 2239 # Determine the maximum possible value for the next element 2240 next_max_value = min(remaining_sum, max_value) 2241 # Generate a random element within the allowed range 2242 next_element = random.randint(min_value, next_max_value) 2243 result.append(next_element) 2244 current_sum += next_element 2245 2246 return result 2247 2248 def _test_core(self, restore=False, debug=False): 2249 2250 if debug: 2251 random.seed(1234567890) 2252 2253 # sanity check - random forward time 2254 2255 xlist = [] 2256 limit = 1000 2257 for _ in range(limit): 2258 y = ZakatTracker.time() 2259 z = '-' 2260 if y not in xlist: 2261 xlist.append(y) 2262 else: 2263 z = 'x' 2264 if debug: 2265 print(z, y) 2266 xx = len(xlist) 2267 if debug: 2268 print('count', xx, ' - unique: ', (xx / limit) * 100, '%') 2269 assert limit == xx 2270 2271 # sanity check - convert date since 1000AD 2272 2273 for year in range(1000, 9000): 2274 ns = ZakatTracker.time(datetime.datetime.strptime(f"{year}-12-30 18:30:45", "%Y-%m-%d %H:%M:%S")) 2275 date = ZakatTracker.time_to_datetime(ns) 2276 if debug: 2277 print(date) 2278 assert date.year == year 2279 assert date.month == 12 2280 assert date.day == 30 2281 assert date.hour == 18 2282 assert date.minute == 30 2283 assert date.second in [44, 45] 2284 2285 # human_readable_size 2286 2287 assert ZakatTracker.human_readable_size(0) == "0.00 B" 2288 assert ZakatTracker.human_readable_size(512) == "512.00 B" 2289 assert ZakatTracker.human_readable_size(1023) == "1023.00 B" 2290 2291 assert ZakatTracker.human_readable_size(1024) == "1.00 KB" 2292 assert ZakatTracker.human_readable_size(2048) == "2.00 KB" 2293 assert ZakatTracker.human_readable_size(5120) == "5.00 KB" 2294 2295 assert ZakatTracker.human_readable_size(1024 ** 2) == "1.00 MB" 2296 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2) == "2.50 MB" 2297 2298 assert ZakatTracker.human_readable_size(1024 ** 3) == "1.00 GB" 2299 assert ZakatTracker.human_readable_size(1024 ** 4) == "1.00 TB" 2300 assert ZakatTracker.human_readable_size(1024 ** 5) == "1.00 PB" 2301 2302 assert ZakatTracker.human_readable_size(1536, decimal_places=0) == "2 KB" 2303 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2, decimal_places=1) == "2.5 MB" 2304 assert ZakatTracker.human_readable_size(1234567890, decimal_places=3) == "1.150 GB" 2305 2306 try: 2307 # noinspection PyTypeChecker 2308 ZakatTracker.human_readable_size("not a number") 2309 assert False, "Expected TypeError for invalid input" 2310 except TypeError: 2311 pass 2312 2313 try: 2314 # noinspection PyTypeChecker 2315 ZakatTracker.human_readable_size(1024, decimal_places="not an int") 2316 assert False, "Expected TypeError for invalid decimal_places" 2317 except TypeError: 2318 pass 2319 2320 # get_dict_size 2321 assert ZakatTracker.get_dict_size({}) == sys.getsizeof({}), "Empty dictionary size mismatch" 2322 assert ZakatTracker.get_dict_size({"a": 1, "b": 2.5, "c": True}) != sys.getsizeof({}), "Not Empty dictionary" 2323 2324 # number scale 2325 error = 0 2326 total = 0 2327 for sign in ['', '-']: 2328 for max_i, max_j, decimal_places in [ 2329 (101, 101, 2), # fiat currency minimum unit took 2 decimal places 2330 (1, 1_000, 8), # cryptocurrency like Satoshi in Bitcoin took 8 decimal places 2331 (1, 1_000, 18) # cryptocurrency like Wei in Ethereum took 18 decimal places 2332 ]: 2333 for return_type in ( 2334 float, 2335 Decimal, 2336 ): 2337 for i in range(max_i): 2338 for j in range(max_j): 2339 total += 1 2340 num_str = f'{sign}{i}.{j:0{decimal_places}d}' 2341 num = return_type(num_str) 2342 scaled = self.scale(num, decimal_places=decimal_places) 2343 unscaled = self.unscale(scaled, return_type=return_type, decimal_places=decimal_places) 2344 if debug: 2345 print( 2346 f'return_type: {return_type}, num_str: {num_str} - num: {num} - scaled: {scaled} - unscaled: {unscaled}') 2347 if unscaled != num: 2348 if debug: 2349 print('***** SCALE ERROR *****') 2350 error += 1 2351 if debug: 2352 print(f'total: {total}, error({error}): {100 * error / total}%') 2353 assert error == 0 2354 2355 assert self.nolock() 2356 assert self._history() is True 2357 2358 table = { 2359 1: [ 2360 (0, 10, 1000, 1000, 1000, 1, 1), 2361 (0, 20, 3000, 3000, 3000, 2, 2), 2362 (0, 30, 6000, 6000, 6000, 3, 3), 2363 (1, 15, 4500, 4500, 4500, 3, 4), 2364 (1, 50, -500, -500, -500, 4, 5), 2365 (1, 100, -10500, -10500, -10500, 5, 6), 2366 ], 2367 'wallet': [ 2368 (1, 90, -9000, -9000, -9000, 1, 1), 2369 (0, 100, 1000, 1000, 1000, 2, 2), 2370 (1, 190, -18000, -18000, -18000, 3, 3), 2371 (0, 1000, 82000, 82000, 82000, 4, 4), 2372 ], 2373 } 2374 for x in table: 2375 for y in table[x]: 2376 self.lock() 2377 if y[0] == 0: 2378 ref = self.track( 2379 unscaled_value=y[1], 2380 desc='test-add', 2381 account=x, 2382 logging=True, 2383 created=ZakatTracker.time(), 2384 debug=debug, 2385 ) 2386 else: 2387 (ref, z) = self.sub( 2388 unscaled_value=y[1], 2389 desc='test-sub', 2390 account=x, 2391 created=ZakatTracker.time(), 2392 ) 2393 if debug: 2394 print('_sub', z, ZakatTracker.time()) 2395 assert ref != 0 2396 assert len(self._vault['account'][x]['log'][ref]['file']) == 0 2397 for i in range(3): 2398 file_ref = self.add_file(x, ref, 'file_' + str(i)) 2399 sleep(0.0000001) 2400 assert file_ref != 0 2401 if debug: 2402 print('ref', ref, 'file', file_ref) 2403 assert len(self._vault['account'][x]['log'][ref]['file']) == i + 1 2404 file_ref = self.add_file(x, ref, 'file_' + str(3)) 2405 assert self.remove_file(x, ref, file_ref) 2406 z = self.balance(x) 2407 if debug: 2408 print("debug-0", z, y) 2409 assert z == y[2] 2410 z = self.balance(x, False) 2411 if debug: 2412 print("debug-1", z, y[3]) 2413 assert z == y[3] 2414 o = self._vault['account'][x]['log'] 2415 z = 0 2416 for i in o: 2417 z += o[i]['value'] 2418 if debug: 2419 print("debug-2", z, type(z)) 2420 print("debug-2", y[4], type(y[4])) 2421 assert z == y[4] 2422 if debug: 2423 print('debug-2 - PASSED') 2424 assert self.box_size(x) == y[5] 2425 assert self.log_size(x) == y[6] 2426 assert not self.nolock() 2427 self.free(self.lock()) 2428 assert self.nolock() 2429 assert self.boxes(x) != {} 2430 assert self.logs(x) != {} 2431 2432 assert not self.hide(x) 2433 assert self.hide(x, False) is False 2434 assert self.hide(x) is False 2435 assert self.hide(x, True) 2436 assert self.hide(x) 2437 2438 assert self.zakatable(x) 2439 assert self.zakatable(x, False) is False 2440 assert self.zakatable(x) is False 2441 assert self.zakatable(x, True) 2442 assert self.zakatable(x) 2443 2444 if restore is True: 2445 count = len(self._vault['history']) 2446 if debug: 2447 print('history-count', count) 2448 assert count == 10 2449 # try mode 2450 for _ in range(count): 2451 assert self.recall(True, debug) 2452 count = len(self._vault['history']) 2453 if debug: 2454 print('history-count', count) 2455 assert count == 10 2456 _accounts = list(table.keys()) 2457 accounts_limit = len(_accounts) + 1 2458 for i in range(-1, -accounts_limit, -1): 2459 account = _accounts[i] 2460 if debug: 2461 print(account, len(table[account])) 2462 transaction_limit = len(table[account]) + 1 2463 for j in range(-1, -transaction_limit, -1): 2464 row = table[account][j] 2465 if debug: 2466 print(row, self.balance(account), self.balance(account, False)) 2467 assert self.balance(account) == self.balance(account, False) 2468 assert self.balance(account) == row[2] 2469 assert self.recall(False, debug) 2470 assert self.recall(False, debug) is False 2471 count = len(self._vault['history']) 2472 if debug: 2473 print('history-count', count) 2474 assert count == 0 2475 self.reset() 2476 2477 def test(self, debug: bool = False) -> bool: 2478 if debug: 2479 print('test', f'debug={debug}') 2480 try: 2481 2482 self._test_core(True, debug) 2483 self._test_core(False, debug) 2484 2485 assert self._history() 2486 2487 # Not allowed for duplicate transactions in the same account and time 2488 2489 created = ZakatTracker.time() 2490 self.track(100, 'test-1', 'same', True, created) 2491 failed = False 2492 try: 2493 self.track(50, 'test-1', 'same', True, created) 2494 except: 2495 failed = True 2496 assert failed is True 2497 2498 self.reset() 2499 2500 # Same account transfer 2501 for x in [1, 'a', True, 1.8, None]: 2502 failed = False 2503 try: 2504 self.transfer(1, x, x, 'same-account', debug=debug) 2505 except: 2506 failed = True 2507 assert failed is True 2508 2509 # Always preserve box age during transfer 2510 2511 series: list[tuple] = [ 2512 (30, 4), 2513 (60, 3), 2514 (90, 2), 2515 ] 2516 case = { 2517 3000: { 2518 'series': series, 2519 'rest': 15000, 2520 }, 2521 6000: { 2522 'series': series, 2523 'rest': 12000, 2524 }, 2525 9000: { 2526 'series': series, 2527 'rest': 9000, 2528 }, 2529 18000: { 2530 'series': series, 2531 'rest': 0, 2532 }, 2533 27000: { 2534 'series': series, 2535 'rest': -9000, 2536 }, 2537 36000: { 2538 'series': series, 2539 'rest': -18000, 2540 }, 2541 } 2542 2543 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2544 2545 for total in case: 2546 if debug: 2547 print('--------------------------------------------------------') 2548 print(f'case[{total}]', case[total]) 2549 for x in case[total]['series']: 2550 self.track( 2551 unscaled_value=x[0], 2552 desc=f"test-{x} ages", 2553 account='ages', 2554 logging=True, 2555 created=selected_time * x[1], 2556 ) 2557 2558 unscaled_total = self.unscale(total) 2559 if debug: 2560 print('unscaled_total', unscaled_total) 2561 refs = self.transfer( 2562 unscaled_amount=unscaled_total, 2563 from_account='ages', 2564 to_account='future', 2565 desc='Zakat Movement', 2566 debug=debug, 2567 ) 2568 2569 if debug: 2570 print('refs', refs) 2571 2572 ages_cache_balance = self.balance('ages') 2573 ages_fresh_balance = self.balance('ages', False) 2574 rest = case[total]['rest'] 2575 if debug: 2576 print('source', ages_cache_balance, ages_fresh_balance, rest) 2577 assert ages_cache_balance == rest 2578 assert ages_fresh_balance == rest 2579 2580 future_cache_balance = self.balance('future') 2581 future_fresh_balance = self.balance('future', False) 2582 if debug: 2583 print('target', future_cache_balance, future_fresh_balance, total) 2584 print('refs', refs) 2585 assert future_cache_balance == total 2586 assert future_fresh_balance == total 2587 2588 # TODO: check boxes times for `ages` should equal box times in `future` 2589 for ref in self._vault['account']['ages']['box']: 2590 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2591 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2592 future_capital = 0 2593 if ref in self._vault['account']['future']['box']: 2594 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2595 future_rest = 0 2596 if ref in self._vault['account']['future']['box']: 2597 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2598 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2599 if debug: 2600 print('================================================================') 2601 print('ages', ages_capital, ages_rest) 2602 print('future', future_capital, future_rest) 2603 if ages_rest == 0: 2604 assert ages_capital == future_capital 2605 elif ages_rest < 0: 2606 assert -ages_capital == future_capital 2607 elif ages_rest > 0: 2608 assert ages_capital == ages_rest + future_capital 2609 self.reset() 2610 assert len(self._vault['history']) == 0 2611 2612 assert self._history() 2613 assert self._history(False) is False 2614 assert self._history() is False 2615 assert self._history(True) 2616 assert self._history() 2617 if debug: 2618 print('####################################################################') 2619 2620 transaction = [ 2621 ( 2622 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2623 2000, 2000, 2000, 1, 1, 2624 ), 2625 ( 2626 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2627 75000, 75000, 75000, 1, 1, 2628 ), 2629 ( 2630 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2631 60000, 60000, 60000, 1, 1, 2632 ), 2633 ] 2634 for z in transaction: 2635 self.lock() 2636 x = z[1] 2637 y = z[2] 2638 self.transfer( 2639 unscaled_amount=z[0], 2640 from_account=x, 2641 to_account=y, 2642 desc='test-transfer', 2643 debug=debug, 2644 ) 2645 zz = self.balance(x) 2646 if debug: 2647 print(zz, z) 2648 assert zz == z[3] 2649 xx = self.accounts()[x] 2650 assert xx == z[3] 2651 assert self.balance(x, False) == z[4] 2652 assert xx == z[4] 2653 2654 s = 0 2655 log = self._vault['account'][x]['log'] 2656 for i in log: 2657 s += log[i]['value'] 2658 if debug: 2659 print('s', s, 'z[5]', z[5]) 2660 assert s == z[5] 2661 2662 assert self.box_size(x) == z[6] 2663 assert self.log_size(x) == z[7] 2664 2665 yy = self.accounts()[y] 2666 assert self.balance(y) == z[8] 2667 assert yy == z[8] 2668 assert self.balance(y, False) == z[9] 2669 assert yy == z[9] 2670 2671 s = 0 2672 log = self._vault['account'][y]['log'] 2673 for i in log: 2674 s += log[i]['value'] 2675 assert s == z[10] 2676 2677 assert self.box_size(y) == z[11] 2678 assert self.log_size(y) == z[12] 2679 assert self.free(self.lock()) 2680 2681 if debug: 2682 pp().pprint(self.check(2.17)) 2683 2684 assert not self.nolock() 2685 history_count = len(self._vault['history']) 2686 if debug: 2687 print('history-count', history_count) 2688 assert history_count == 4 2689 assert not self.free(ZakatTracker.time()) 2690 assert self.free(self.lock()) 2691 assert self.nolock() 2692 assert len(self._vault['history']) == 3 2693 2694 # storage 2695 2696 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 2697 if os.path.exists(_path): 2698 os.remove(_path) 2699 self.save() 2700 assert os.path.getsize(_path) > 0 2701 self.reset() 2702 assert self.recall(False, debug) is False 2703 self.load() 2704 assert self._vault['account'] is not None 2705 2706 # recall 2707 2708 assert self.nolock() 2709 assert len(self._vault['history']) == 3 2710 assert self.recall(False, debug) is True 2711 assert len(self._vault['history']) == 2 2712 assert self.recall(False, debug) is True 2713 assert len(self._vault['history']) == 1 2714 assert self.recall(False, debug) is True 2715 assert len(self._vault['history']) == 0 2716 assert self.recall(False, debug) is False 2717 assert len(self._vault['history']) == 0 2718 2719 # exchange 2720 2721 self.exchange("cash", 25, 3.75, "2024-06-25") 2722 self.exchange("cash", 22, 3.73, "2024-06-22") 2723 self.exchange("cash", 15, 3.69, "2024-06-15") 2724 self.exchange("cash", 10, 3.66) 2725 2726 for i in range(1, 30): 2727 exchange = self.exchange("cash", i) 2728 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2729 if debug: 2730 print(i, rate, description, created) 2731 assert created 2732 if i < 10: 2733 assert rate == 1 2734 assert description is None 2735 elif i == 10: 2736 assert rate == 3.66 2737 assert description is None 2738 elif i < 15: 2739 assert rate == 3.66 2740 assert description is None 2741 elif i == 15: 2742 assert rate == 3.69 2743 assert description is not None 2744 elif i < 22: 2745 assert rate == 3.69 2746 assert description is not None 2747 elif i == 22: 2748 assert rate == 3.73 2749 assert description is not None 2750 elif i >= 25: 2751 assert rate == 3.75 2752 assert description is not None 2753 exchange = self.exchange("bank", i) 2754 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2755 if debug: 2756 print(i, rate, description, created) 2757 assert created 2758 assert rate == 1 2759 assert description is None 2760 2761 assert len(self._vault['exchange']) > 0 2762 assert len(self.exchanges()) > 0 2763 self._vault['exchange'].clear() 2764 assert len(self._vault['exchange']) == 0 2765 assert len(self.exchanges()) == 0 2766 2767 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2768 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2769 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2770 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2771 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2772 2773 for i in [x * 0.12 for x in range(-15, 21)]: 2774 if i <= 0: 2775 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2776 else: 2777 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2778 2779 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2780 for i in range(1, 31): 2781 timestamp_ns = ZakatTracker.day_to_time(i) 2782 exchange = self.exchange("cash", timestamp_ns) 2783 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2784 if debug: 2785 print(i, rate, description, created) 2786 assert created 2787 if i < 10: 2788 assert rate == 1 2789 assert description is None 2790 elif i == 10: 2791 assert rate == 3.66 2792 assert description is None 2793 elif i < 15: 2794 assert rate == 3.66 2795 assert description is None 2796 elif i == 15: 2797 assert rate == 3.69 2798 assert description is not None 2799 elif i < 22: 2800 assert rate == 3.69 2801 assert description is not None 2802 elif i == 22: 2803 assert rate == 3.73 2804 assert description is not None 2805 elif i >= 25: 2806 assert rate == 3.75 2807 assert description is not None 2808 exchange = self.exchange("bank", i) 2809 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2810 if debug: 2811 print(i, rate, description, created) 2812 assert created 2813 assert rate == 1 2814 assert description is None 2815 2816 # csv 2817 2818 csv_count = 1000 2819 2820 for with_rate, path in { 2821 False: 'test-import_csv-no-exchange', 2822 True: 'test-import_csv-with-exchange', 2823 }.items(): 2824 2825 if debug: 2826 print('test_import_csv', with_rate, path) 2827 2828 csv_path = path + '.csv' 2829 if os.path.exists(csv_path): 2830 os.remove(csv_path) 2831 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2832 if debug: 2833 print('generate_random_csv_file', c) 2834 assert c == csv_count 2835 assert os.path.getsize(csv_path) > 0 2836 cache_path = self.import_csv_cache_path() 2837 if os.path.exists(cache_path): 2838 os.remove(cache_path) 2839 self.reset() 2840 (created, found, bad) = self.import_csv(csv_path, debug) 2841 bad_count = len(bad) 2842 assert bad_count > 0 2843 if debug: 2844 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2845 print('bad', bad) 2846 tmp_size = os.path.getsize(cache_path) 2847 assert tmp_size > 0 2848 # TODO: assert created + found + bad_count == csv_count 2849 # TODO: assert created == csv_count 2850 # TODO: assert bad_count == 0 2851 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2852 bad_2_count = len(bad_2) 2853 if debug: 2854 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2855 print('bad', bad) 2856 assert bad_2_count > 0 2857 # TODO: assert tmp_size == os.path.getsize(cache_path) 2858 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2859 # TODO: assert created == found_2 2860 # TODO: assert bad_count == bad_2_count 2861 # TODO: assert found_2 == csv_count 2862 # TODO: assert bad_2_count == 0 2863 # TODO: assert created_2 == 0 2864 2865 # payment parts 2866 2867 positive_parts = self.build_payment_parts(100, positive_only=True) 2868 assert self.check_payment_parts(positive_parts) != 0 2869 assert self.check_payment_parts(positive_parts) != 0 2870 all_parts = self.build_payment_parts(300, positive_only=False) 2871 assert self.check_payment_parts(all_parts) != 0 2872 assert self.check_payment_parts(all_parts) != 0 2873 if debug: 2874 pp().pprint(positive_parts) 2875 pp().pprint(all_parts) 2876 # dynamic discount 2877 suite = [] 2878 count = 3 2879 for exceed in [False, True]: 2880 case = [] 2881 for parts in [positive_parts, all_parts]: 2882 part = parts.copy() 2883 demand = part['demand'] 2884 if debug: 2885 print(demand, part['total']) 2886 i = 0 2887 z = demand / count 2888 cp = { 2889 'account': {}, 2890 'demand': demand, 2891 'exceed': exceed, 2892 'total': part['total'], 2893 } 2894 j = '' 2895 for x, y in part['account'].items(): 2896 x_exchange = self.exchange(x) 2897 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2898 if exceed and zz <= demand: 2899 i += 1 2900 y['part'] = zz 2901 if debug: 2902 print(exceed, y) 2903 cp['account'][x] = y 2904 case.append(y) 2905 elif not exceed and y['balance'] >= zz: 2906 i += 1 2907 y['part'] = zz 2908 if debug: 2909 print(exceed, y) 2910 cp['account'][x] = y 2911 case.append(y) 2912 j = x 2913 if i >= count: 2914 break 2915 if len(cp['account'][j]) > 0: 2916 suite.append(cp) 2917 if debug: 2918 print('suite', len(suite)) 2919 # vault = self._vault.copy() 2920 for case in suite: 2921 # self._vault = vault.copy() 2922 if debug: 2923 print('case', case) 2924 result = self.check_payment_parts(case) 2925 if debug: 2926 print('check_payment_parts', result, f'exceed: {exceed}') 2927 assert result == 0 2928 2929 report = self.check(2.17, None, debug) 2930 (valid, brief, plan) = report 2931 if debug: 2932 print('valid', valid) 2933 zakat_result = self.zakat(report, parts=case, debug=debug) 2934 if debug: 2935 print('zakat-result', zakat_result) 2936 assert valid == zakat_result 2937 2938 assert self.save(path + f'.{self.ext()}') 2939 assert self.export_json(path + '.json') 2940 2941 assert self.export_json("1000-transactions-test.json") 2942 assert self.save(f"1000-transactions-test.{self.ext()}") 2943 2944 self.reset() 2945 2946 # test transfer between accounts with different exchange rate 2947 2948 a_SAR = "Bank (SAR)" 2949 b_USD = "Bank (USD)" 2950 c_SAR = "Safe (SAR)" 2951 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2952 for case in [ 2953 (0, a_SAR, "SAR Gift", 1000, 100000), 2954 (1, a_SAR, 1), 2955 (0, b_USD, "USD Gift", 500, 50000), 2956 (1, b_USD, 1), 2957 (2, b_USD, 3.75), 2958 (1, b_USD, 3.75), 2959 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2960 (0, c_SAR, "Salary", 750, 75000), 2961 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2962 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2963 ]: 2964 if debug: 2965 print('case', case) 2966 match (case[0]): 2967 case 0: # track 2968 _, account, desc, x, balance = case 2969 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2970 2971 cached_value = self.balance(account, cached=True) 2972 fresh_value = self.balance(account, cached=False) 2973 if debug: 2974 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2975 assert cached_value == balance 2976 assert fresh_value == balance 2977 case 1: # check-exchange 2978 _, account, expected_rate = case 2979 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2980 if debug: 2981 print('t-exchange', t_exchange) 2982 assert t_exchange['rate'] == expected_rate 2983 case 2: # do-exchange 2984 _, account, rate = case 2985 self.exchange(account, rate=rate, debug=debug) 2986 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2987 if debug: 2988 print('b-exchange', b_exchange) 2989 assert b_exchange['rate'] == rate 2990 case 3: # transfer 2991 _, x, a, b, desc, a_balance, b_balance = case 2992 self.transfer(x, a, b, desc, debug=debug) 2993 2994 cached_value = self.balance(a, cached=True) 2995 fresh_value = self.balance(a, cached=False) 2996 if debug: 2997 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 2998 assert cached_value == a_balance 2999 assert fresh_value == a_balance 3000 3001 cached_value = self.balance(b, cached=True) 3002 fresh_value = self.balance(b, cached=False) 3003 if debug: 3004 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3005 assert cached_value == b_balance 3006 assert fresh_value == b_balance 3007 3008 # Transfer all in many chunks randomly from B to A 3009 a_SAR_balance = 137125 3010 b_USD_balance = 50100 3011 b_USD_exchange = self.exchange(b_USD) 3012 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3013 if debug: 3014 print('amounts', amounts) 3015 i = 0 3016 for x in amounts: 3017 if debug: 3018 print(f'{i} - transfer-with-exchange({x})') 3019 self.transfer( 3020 unscaled_amount=self.unscale(x), 3021 from_account=b_USD, 3022 to_account=a_SAR, 3023 desc=f"{x} USD -> SAR", 3024 debug=debug, 3025 ) 3026 3027 b_USD_balance -= x 3028 cached_value = self.balance(b_USD, cached=True) 3029 fresh_value = self.balance(b_USD, cached=False) 3030 if debug: 3031 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3032 b_USD_balance) 3033 assert cached_value == b_USD_balance 3034 assert fresh_value == b_USD_balance 3035 3036 a_SAR_balance += int(x * b_USD_exchange['rate']) 3037 cached_value = self.balance(a_SAR, cached=True) 3038 fresh_value = self.balance(a_SAR, cached=False) 3039 if debug: 3040 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3041 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3042 assert cached_value == a_SAR_balance 3043 assert fresh_value == a_SAR_balance 3044 i += 1 3045 3046 # Transfer all in many chunks randomly from C to A 3047 c_SAR_balance = 37500 3048 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3049 if debug: 3050 print('amounts', amounts) 3051 i = 0 3052 for x in amounts: 3053 if debug: 3054 print(f'{i} - transfer-with-exchange({x})') 3055 self.transfer( 3056 unscaled_amount=self.unscale(x), 3057 from_account=c_SAR, 3058 to_account=a_SAR, 3059 desc=f"{x} SAR -> a_SAR", 3060 debug=debug, 3061 ) 3062 3063 c_SAR_balance -= x 3064 cached_value = self.balance(c_SAR, cached=True) 3065 fresh_value = self.balance(c_SAR, cached=False) 3066 if debug: 3067 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3068 c_SAR_balance) 3069 assert cached_value == c_SAR_balance 3070 assert fresh_value == c_SAR_balance 3071 3072 a_SAR_balance += x 3073 cached_value = self.balance(a_SAR, cached=True) 3074 fresh_value = self.balance(a_SAR, cached=False) 3075 if debug: 3076 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3077 a_SAR_balance) 3078 assert cached_value == a_SAR_balance 3079 assert fresh_value == a_SAR_balance 3080 i += 1 3081 3082 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3083 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3084 3085 # check & zakat with exchange rates for many cycles 3086 3087 for rate, values in { 3088 1: { 3089 'in': [1000, 2000, 10000], 3090 'exchanged': [100000, 200000, 1000000], 3091 'out': [2500, 5000, 73140], 3092 }, 3093 3.75: { 3094 'in': [200, 1000, 5000], 3095 'exchanged': [75000, 375000, 1875000], 3096 'out': [1875, 9375, 137138], 3097 }, 3098 }.items(): 3099 a, b, c = values['in'] 3100 m, n, o = values['exchanged'] 3101 x, y, z = values['out'] 3102 if debug: 3103 print('rate', rate, 'values', values) 3104 for case in [ 3105 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3106 {'safe': {0: {'below_nisab': x}}}, 3107 ], False, m), 3108 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3109 {'safe': {0: {'count': 1, 'total': y}}}, 3110 ], True, n), 3111 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3112 {'cave': {0: {'count': 3, 'total': z}}}, 3113 ], True, o), 3114 ]: 3115 if debug: 3116 print(f"############# check(rate: {rate}) #############") 3117 print('case', case) 3118 self.reset() 3119 self.exchange(account=case[1], created=case[2], rate=rate) 3120 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3121 assert self.snapshot() 3122 3123 # assert self.nolock() 3124 # history_size = len(self._vault['history']) 3125 # print('history_size', history_size) 3126 # assert history_size == 2 3127 assert self.lock() 3128 assert not self.nolock() 3129 report = self.check(2.17, None, debug) 3130 (valid, brief, plan) = report 3131 if debug: 3132 print('brief', brief) 3133 assert valid == case[4] 3134 assert case[5] == brief[0] 3135 assert case[5] == brief[1] 3136 3137 if debug: 3138 pp().pprint(plan) 3139 3140 for x in plan: 3141 assert case[1] == x 3142 if 'total' in case[3][0][x][0].keys(): 3143 assert case[3][0][x][0]['total'] == int(brief[2]) 3144 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3145 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3146 else: 3147 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3148 if debug: 3149 pp().pprint(report) 3150 result = self.zakat(report, debug=debug) 3151 if debug: 3152 print('zakat-result', result, case[4]) 3153 assert result == case[4] 3154 report = self.check(2.17, None, debug) 3155 (valid, brief, plan) = report 3156 assert valid is False 3157 3158 history_size = len(self._vault['history']) 3159 if debug: 3160 print('history_size', history_size) 3161 assert history_size == 3 3162 assert not self.nolock() 3163 assert self.recall(False, debug) is False 3164 self.free(self.lock()) 3165 assert self.nolock() 3166 3167 for i in range(3, 0, -1): 3168 history_size = len(self._vault['history']) 3169 if debug: 3170 print('history_size', history_size) 3171 assert history_size == i 3172 assert self.recall(False, debug) is True 3173 3174 assert self.nolock() 3175 assert self.recall(False, debug) is False 3176 3177 history_size = len(self._vault['history']) 3178 if debug: 3179 print('history_size', history_size) 3180 assert history_size == 0 3181 3182 account_size = len(self._vault['account']) 3183 if debug: 3184 print('account_size', account_size) 3185 assert account_size == 0 3186 3187 report_size = len(self._vault['report']) 3188 if debug: 3189 print('report_size', report_size) 3190 assert report_size == 0 3191 3192 assert self.nolock() 3193 return True 3194 except Exception as e: 3195 # pp().pprint(self._vault) 3196 assert self.export_json("test-snapshot.json") 3197 assert self.save(f"test-snapshot.{self.ext()}") 3198 raise e
A class for tracking and calculating Zakat.
This class provides functionalities for recording transactions, calculating Zakat due, and managing account balances. It also offers features like importing transactions from CSV files, exporting data to JSON format, and saving/loading the tracker state.
The ZakatTracker
class is designed to handle both positive and negative transactions,
allowing for flexible tracking of financial activities related to Zakat. It also supports
the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due
based on the current silver price.
The class uses a camel file as its database to persist the tracker state, ensuring data integrity across sessions. It also provides options for enabling or disabling history tracking, allowing users to choose their preferred level of detail.
In addition, the ZakatTracker
class includes various helper methods like
time
, time_to_datetime
, lock
, free
, recall
, export_json
,
and more. These methods provide additional functionalities and flexibility
for interacting with and managing the Zakat tracker.
Attributes: ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. ZakatTracker.Version (function): The version of the ZakatTracker class.
Data Structure: The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data.
_vault (dict):
- account (dict):
- {account_number} (dict):
- balance (int): The current balance of the account.
- box (dict): A dictionary storing transaction details.
- {timestamp} (dict):
- capital (int): The initial amount of the transaction.
- count (int): The number of times Zakat has been calculated for this transaction.
- last (int): The timestamp of the last Zakat calculation.
- rest (int): The remaining amount after Zakat deductions and withdrawal.
- total (int): The total Zakat deducted from this transaction.
- count (int): The total number of transactions for the account.
- log (dict): A dictionary storing transaction logs.
- {timestamp} (dict):
- value (int): The transaction amount (positive or negative).
- desc (str): The description of the transaction.
- ref (int): The box reference (positive or None).
- file (dict): A dictionary storing file references associated with the transaction.
- hide (bool): Indicates whether the account is hidden or not.
- zakatable (bool): Indicates whether the account is subject to Zakat.
- exchange (dict):
- account (dict):
- {timestamps} (dict):
- rate (float): Exchange rate when compared to local currency.
- description (str): The description of the exchange rate.
- history (dict):
- {timestamp} (list): A list of dictionaries storing the history of actions performed.
- {action_dict} (dict):
- action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT).
- account (str): The account number associated with the action.
- ref (int): The reference number of the transaction.
- file (int): The reference number of the file (if applicable).
- key (str): The key associated with the action (e.g., 'rest', 'total').
- value (int): The value associated with the action.
- math (MathOperation): The mathematical operation performed (if applicable).
- lock (int or None): The timestamp indicating the current lock status (None if not locked).
- report (dict):
- {timestamp} (tuple): A tuple storing Zakat report details.
270 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 271 """ 272 Initialize ZakatTracker with database path and history mode. 273 274 Parameters: 275 db_path (str): The path to the database file. Default is "zakat.camel". 276 history_mode (bool): The mode for tracking history. Default is True. 277 278 Returns: 279 None 280 """ 281 self._base_path = None 282 self._vault_path = None 283 self._vault = None 284 self.reset() 285 self._history(history_mode) 286 self.path(db_path)
Initialize ZakatTracker with database path and history mode.
Parameters: db_path (str): The path to the database file. Default is "zakat.camel". history_mode (bool): The mode for tracking history. Default is True.
Returns: None
194 @staticmethod 195 def Version() -> str: 196 """ 197 Returns the current version of the software. 198 199 This function returns a string representing the current version of the software, 200 including major, minor, and patch version numbers in the format "X.Y.Z". 201 202 Returns: 203 str: The current version of the software. 204 """ 205 return '0.2.84'
Returns the current version of the software.
This function returns a string representing the current version of the software, including major, minor, and patch version numbers in the format "X.Y.Z".
Returns: str: The current version of the software.
207 @staticmethod 208 def ZakatCut(x: float) -> float: 209 """ 210 Calculates the Zakat amount due on an asset. 211 212 This function calculates the zakat amount due on a given asset value over one lunar year. 213 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 214 that exceeds a certain threshold (Nisab). 215 216 Parameters: 217 x: The total value of the asset on which Zakat is to be calculated. 218 219 Returns: 220 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 221 """ 222 return 0.025 * x # Zakat Cut in one Lunar Year
Calculates the Zakat amount due on an asset.
This function calculates the zakat amount due on a given asset value over one lunar year. Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth that exceeds a certain threshold (Nisab).
Parameters: x: The total value of the asset on which Zakat is to be calculated.
Returns: The amount of Zakat due on the asset, calculated as 2.5% of the asset's value.
224 @staticmethod 225 def TimeCycle(days: int = 355) -> int: 226 """ 227 Calculates the approximate duration of a lunar year in nanoseconds. 228 229 This function calculates the approximate duration of a lunar year based on the given number of days. 230 It converts the given number of days into nanoseconds for use in high-precision timing applications. 231 232 Parameters: 233 days: The number of days in a lunar year. Defaults to 355, 234 which is an approximation of the average length of a lunar year. 235 236 Returns: 237 The approximate duration of a lunar year in nanoseconds. 238 """ 239 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds
Calculates the approximate duration of a lunar year in nanoseconds.
This function calculates the approximate duration of a lunar year based on the given number of days. It converts the given number of days into nanoseconds for use in high-precision timing applications.
Parameters: days: The number of days in a lunar year. Defaults to 355, which is an approximation of the average length of a lunar year.
Returns: The approximate duration of a lunar year in nanoseconds.
241 @staticmethod 242 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 243 """ 244 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 245 246 This function calculates the Nisab value, which is the minimum threshold of wealth, 247 that makes an individual liable for paying Zakat. 248 The Nisab value is determined by the equivalent value of a specific amount 249 of gold or silver (currently 595 grams in silver) in the local currency. 250 251 Parameters: 252 - gram_price (float): The price per gram of Nisab. 253 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 254 255 Returns: 256 - float: The total value of Nisab based on the given price per gram. 257 """ 258 return gram_price * gram_quantity
Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram.
This function calculates the Nisab value, which is the minimum threshold of wealth, that makes an individual liable for paying Zakat. The Nisab value is determined by the equivalent value of a specific amount of gold or silver (currently 595 grams in silver) in the local currency.
Parameters:
- gram_price (float): The price per gram of Nisab.
- gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver.
Returns:
- float: The total value of Nisab based on the given price per gram.
260 @staticmethod 261 def ext() -> str: 262 """ 263 Returns the file extension used by the ZakatTracker class. 264 265 Returns: 266 str: The file extension used by the ZakatTracker class, which is 'camel'. 267 """ 268 return 'camel'
Returns the file extension used by the ZakatTracker class.
Returns: str: The file extension used by the ZakatTracker class, which is 'camel'.
288 def path(self, path: str = None) -> str: 289 """ 290 Set or get the path to the database file. 291 292 If no path is provided, the current path is returned. 293 If a path is provided, it is set as the new path. 294 The function also creates the necessary directories if the provided path is a file. 295 296 Parameters: 297 path (str): The new path to the database file. If not provided, the current path is returned. 298 299 Returns: 300 str: The current or new path to the database file. 301 """ 302 if path is None: 303 return self._vault_path 304 self._vault_path = Path(path).resolve() 305 base_path = Path(path).resolve() 306 if base_path.is_file() or base_path.suffix: 307 base_path = base_path.parent 308 base_path.mkdir(parents=True, exist_ok=True) 309 self._base_path = base_path 310 return str(self._vault_path)
Set or get the path to the database file.
If no path is provided, the current path is returned. If a path is provided, it is set as the new path. The function also creates the necessary directories if the provided path is a file.
Parameters: path (str): The new path to the database file. If not provided, the current path is returned.
Returns: str: The current or new path to the database file.
312 def base_path(self, *args) -> str: 313 """ 314 Generate a base path by joining the provided arguments with the existing base path. 315 316 Parameters: 317 *args (str): Variable length argument list of strings to be joined with the base path. 318 319 Returns: 320 str: The generated base path. If no arguments are provided, the existing base path is returned. 321 """ 322 if not args: 323 return str(self._base_path) 324 filtered_args = [] 325 ignored_filename = None 326 for arg in args: 327 if Path(arg).suffix: 328 ignored_filename = arg 329 else: 330 filtered_args.append(arg) 331 base_path = Path(self._base_path) 332 full_path = base_path.joinpath(*filtered_args) 333 full_path.mkdir(parents=True, exist_ok=True) 334 if ignored_filename is not None: 335 return full_path.resolve() / ignored_filename # Join with the ignored filename 336 return str(full_path.resolve())
Generate a base path by joining the provided arguments with the existing base path.
Parameters: *args (str): Variable length argument list of strings to be joined with the base path.
Returns: str: The generated base path. If no arguments are provided, the existing base path is returned.
338 @staticmethod 339 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 340 """ 341 Scales a numerical value by a specified power of 10, returning an integer. 342 343 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 344 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 345 346 Parameters: 347 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 348 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 349 by a factor of 100 (e.g., converts 1.23 to 123). 350 351 Returns: 352 The scaled value, rounded to the nearest integer. 353 354 Raises: 355 TypeError: If the input `x` is not a valid numeric type. 356 357 Examples: 358 >>> ZakatTracker.scale(3.14159) 359 314 360 >>> ZakatTracker.scale(1234, decimal_places=3) 361 1234000 362 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 363 50 364 """ 365 if not isinstance(x, (float, int, Decimal)): 366 raise TypeError("Input 'x' must be a float, int, or Decimal.") 367 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places))
Scales a numerical value by a specified power of 10, returning an integer.
This function is designed to handle various numeric types (float
, int
, or Decimal
) and
facilitate precise scaling operations, particularly useful in financial or scientific calculations.
Parameters: x: The numeric value to scale. Can be a floating-point number, integer, or decimal. decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled by a factor of 100 (e.g., converts 1.23 to 123).
Returns: The scaled value, rounded to the nearest integer.
Raises:
TypeError: If the input x
is not a valid numeric type.
Examples:
>>> ZakatTracker.scale(3.14159)
314
>>> ZakatTracker.scale(1234, decimal_places=3)
1234000
>>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4)
50
369 @staticmethod 370 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 371 """ 372 Unscales an integer by a power of 10. 373 374 Parameters: 375 x: The integer to unscale. 376 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 377 decimal_places: The power of 10 to use. Defaults to 2. 378 379 Returns: 380 The unscaled number, converted to the specified return_type. 381 382 Raises: 383 TypeError: If the return_type is not float or Decimal. 384 """ 385 if return_type not in (float, Decimal): 386 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 387 return round(return_type(x / (10 ** decimal_places)), decimal_places)
Unscales an integer by a power of 10.
Parameters: x: The integer to unscale. return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. decimal_places: The power of 10 to use. Defaults to 2.
Returns: The unscaled number, converted to the specified return_type.
Raises: TypeError: If the return_type is not float or Decimal.
403 def reset(self) -> None: 404 """ 405 Reset the internal data structure to its initial state. 406 407 Parameters: 408 None 409 410 Returns: 411 None 412 """ 413 self._vault = { 414 'account': {}, 415 'exchange': {}, 416 'history': {}, 417 'lock': None, 418 'report': {}, 419 }
Reset the internal data structure to its initial state.
Parameters: None
Returns: None
421 @staticmethod 422 def time(now: datetime = None) -> int: 423 """ 424 Generates a timestamp based on the provided datetime object or the current datetime. 425 426 Parameters: 427 now (datetime, optional): The datetime object to generate the timestamp from. 428 If not provided, the current datetime is used. 429 430 Returns: 431 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 432 before 1970 will return in negative until 1000AD. 433 """ 434 if now is None: 435 now = datetime.datetime.now() 436 ordinal_day = now.toordinal() 437 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 438 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day)
Generates a timestamp based on the provided datetime object or the current datetime.
Parameters: now (datetime, optional): The datetime object to generate the timestamp from. If not provided, the current datetime is used.
Returns: int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), before 1970 will return in negative until 1000AD.
440 @staticmethod 441 def time_to_datetime(ordinal_ns: int) -> datetime: 442 """ 443 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 444 445 Parameters: 446 ordinal_ns (int): The ordinal number of days since 1000-01-01. 447 448 Returns: 449 datetime: The corresponding datetime object. 450 """ 451 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 452 ns_in_day = ordinal_ns % 86_400_000_000_000 453 d = datetime.datetime.fromordinal(ordinal_day) 454 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 455 return datetime.datetime.combine(d, datetime.time()) + t
Converts an ordinal number (number of days since 1000-01-01) to a datetime object.
Parameters: ordinal_ns (int): The ordinal number of days since 1000-01-01.
Returns: datetime: The corresponding datetime object.
457 def clean_history(self, lock: int | None = None) -> int: 458 """ 459 Cleans up the history of actions performed on the ZakatTracker instance. 460 461 Parameters: 462 lock (int, optional): The lock ID is used to clean up the empty history. 463 If not provided, it cleans up the empty history records for all locks. 464 465 Returns: 466 int: The number of locks cleaned up. 467 """ 468 count = 0 469 if lock in self._vault['history']: 470 if len(self._vault['history'][lock]) <= 0: 471 count += 1 472 del self._vault['history'][lock] 473 return count 474 self.free(self.lock()) 475 for lock in self._vault['history']: 476 if len(self._vault['history'][lock]) <= 0: 477 count += 1 478 del self._vault['history'][lock] 479 return count
Cleans up the history of actions performed on the ZakatTracker instance.
Parameters: lock (int, optional): The lock ID is used to clean up the empty history. If not provided, it cleans up the empty history records for all locks.
Returns: int: The number of locks cleaned up.
517 def nolock(self) -> bool: 518 """ 519 Check if the vault lock is currently not set. 520 521 Returns: 522 bool: True if the vault lock is not set, False otherwise. 523 """ 524 return self._vault['lock'] is None
Check if the vault lock is currently not set.
Returns: bool: True if the vault lock is not set, False otherwise.
526 def lock(self) -> int: 527 """ 528 Acquires a lock on the ZakatTracker instance. 529 530 Returns: 531 int: The lock ID. This ID can be used to release the lock later. 532 """ 533 return self._step()
Acquires a lock on the ZakatTracker instance.
Returns: int: The lock ID. This ID can be used to release the lock later.
535 def vault(self) -> dict: 536 """ 537 Returns a copy of the internal vault dictionary. 538 539 This method is used to retrieve the current state of the ZakatTracker object. 540 It provides a snapshot of the internal data structure, allowing for further 541 processing or analysis. 542 543 Returns: 544 dict: A copy of the internal vault dictionary. 545 """ 546 return self._vault.copy()
Returns a copy of the internal vault dictionary.
This method is used to retrieve the current state of the ZakatTracker object. It provides a snapshot of the internal data structure, allowing for further processing or analysis.
Returns: dict: A copy of the internal vault dictionary.
548 def stats(self) -> dict[str, tuple]: 549 """ 550 Calculates and returns statistics about the object's data storage. 551 552 This method determines the size of the database file on disk and the 553 size of the data currently held in RAM (likely within a dictionary). 554 Both sizes are reported in bytes and in a human-readable format 555 (e.g., KB, MB). 556 557 Returns: 558 dict[str, tuple]: A dictionary containing the following statistics: 559 560 * 'database': A tuple with two elements: 561 - The database file size in bytes (int). 562 - The database file size in human-readable format (str). 563 * 'ram': A tuple with two elements: 564 - The RAM usage (dictionary size) in bytes (int). 565 - The RAM usage in human-readable format (str). 566 567 Example: 568 >>> stats = my_object.stats() 569 >>> print(stats['database']) 570 (256000, '250.0 KB') 571 >>> print(stats['ram']) 572 (12345, '12.1 KB') 573 """ 574 ram_size = self.get_dict_size(self.vault()) 575 file_size = os.path.getsize(self.path()) 576 return { 577 'database': (file_size, self.human_readable_size(file_size)), 578 'ram': (ram_size, self.human_readable_size(ram_size)), 579 }
Calculates and returns statistics about the object's data storage.
This method determines the size of the database file on disk and the size of the data currently held in RAM (likely within a dictionary). Both sizes are reported in bytes and in a human-readable format (e.g., KB, MB).
Returns: dict[str, tuple]: A dictionary containing the following statistics:
* 'database': A tuple with two elements:
- The database file size in bytes (int).
- The database file size in human-readable format (str).
* 'ram': A tuple with two elements:
- The RAM usage (dictionary size) in bytes (int).
- The RAM usage in human-readable format (str).
Example:
>>> stats = my_object.stats()
>>> print(stats['database'])
(256000, '250.0 KB')
>>> print(stats['ram'])
(12345, '12.1 KB')
581 def files(self) -> list[dict[str, str | int]]: 582 """ 583 Retrieves information about files associated with this class. 584 585 This class method provides a standardized way to gather details about 586 files used by the class for storage, snapshots, and CSV imports. 587 588 Returns: 589 list[dict[str, str | int]]: A list of dictionaries, each containing information 590 about a specific file: 591 592 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 593 * path (str): The full file path. 594 * exists (bool): Whether the file exists on the filesystem. 595 * size (int): The file size in bytes (0 if the file doesn't exist). 596 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 597 598 Example: 599 ``` 600 file_info = MyClass.files() 601 for info in file_info: 602 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 603 ``` 604 """ 605 result = [] 606 for file_type, path in { 607 'database': self.path(), 608 'snapshot': self.snapshot_cache_path(), 609 'import_csv': self.import_csv_cache_path(), 610 }.items(): 611 exists = os.path.exists(path) 612 size = os.path.getsize(path) if exists else 0 613 human_readable_size = self.human_readable_size(size) if exists else 0 614 result.append({ 615 'type': file_type, 616 'path': path, 617 'exists': exists, 618 'size': size, 619 'human_readable_size': human_readable_size, 620 }) 621 return result
Retrieves information about files associated with this class.
This class method provides a standardized way to gather details about files used by the class for storage, snapshots, and CSV imports.
Returns: list[dict[str, str | int]]: A list of dictionaries, each containing information about a specific file:
* type (str): The type of file ('database', 'snapshot', 'import_csv').
* path (str): The full file path.
* exists (bool): Whether the file exists on the filesystem.
* size (int): The file size in bytes (0 if the file doesn't exist).
* human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB').
Example:
file_info = MyClass.files()
for info in file_info:
print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}")
623 def steps(self) -> dict: 624 """ 625 Returns a copy of the history of steps taken in the ZakatTracker. 626 627 The history is a dictionary where each key is a unique identifier for a step, 628 and the corresponding value is a dictionary containing information about the step. 629 630 Returns: 631 dict: A copy of the history of steps taken in the ZakatTracker. 632 """ 633 return self._vault['history'].copy()
Returns a copy of the history of steps taken in the ZakatTracker.
The history is a dictionary where each key is a unique identifier for a step, and the corresponding value is a dictionary containing information about the step.
Returns: dict: A copy of the history of steps taken in the ZakatTracker.
635 def free(self, lock: int, auto_save: bool = True) -> bool: 636 """ 637 Releases the lock on the database. 638 639 Parameters: 640 lock (int): The lock ID to be released. 641 auto_save (bool): Whether to automatically save the database after releasing the lock. 642 643 Returns: 644 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 645 """ 646 if lock == self._vault['lock']: 647 self._vault['lock'] = None 648 self.clean_history(lock) 649 if auto_save: 650 return self.save(self.path()) 651 return True 652 return False
Releases the lock on the database.
Parameters: lock (int): The lock ID to be released. auto_save (bool): Whether to automatically save the database after releasing the lock.
Returns: bool: True if the lock is successfully released and (optionally) saved, False otherwise.
654 def account_exists(self, account) -> bool: 655 """ 656 Check if the given account exists in the vault. 657 658 Parameters: 659 account (str): The account number to check. 660 661 Returns: 662 bool: True if the account exists, False otherwise. 663 """ 664 return account in self._vault['account']
Check if the given account exists in the vault.
Parameters: account (str): The account number to check.
Returns: bool: True if the account exists, False otherwise.
666 def box_size(self, account) -> int: 667 """ 668 Calculate the size of the box for a specific account. 669 670 Parameters: 671 account (str): The account number for which the box size needs to be calculated. 672 673 Returns: 674 int: The size of the box for the given account. If the account does not exist, -1 is returned. 675 """ 676 if self.account_exists(account): 677 return len(self._vault['account'][account]['box']) 678 return -1
Calculate the size of the box for a specific account.
Parameters: account (str): The account number for which the box size needs to be calculated.
Returns: int: The size of the box for the given account. If the account does not exist, -1 is returned.
680 def log_size(self, account) -> int: 681 """ 682 Get the size of the log for a specific account. 683 684 Parameters: 685 account (str): The account number for which the log size needs to be calculated. 686 687 Returns: 688 int: The size of the log for the given account. If the account does not exist, -1 is returned. 689 """ 690 if self.account_exists(account): 691 return len(self._vault['account'][account]['log']) 692 return -1
Get the size of the log for a specific account.
Parameters: account (str): The account number for which the log size needs to be calculated.
Returns: int: The size of the log for the given account. If the account does not exist, -1 is returned.
694 @staticmethod 695 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 696 """ 697 Calculates the hash of a file using the specified algorithm. 698 699 Parameters: 700 file_path (str): The path to the file. 701 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 702 703 Returns: 704 str: The hexadecimal representation of the file's hash. 705 """ 706 hash_obj = hashlib.new(algorithm) # Create the hash object 707 with open(file_path, "rb") as f: # Open file in binary mode for reading 708 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 709 hash_obj.update(chunk) 710 return hash_obj.hexdigest() # Return the hash as a hexadecimal string
Calculates the hash of a file using the specified algorithm.
Parameters: file_path (str): The path to the file. algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b".
Returns: str: The hexadecimal representation of the file's hash.
712 def snapshot_cache_path(self): 713 """ 714 Generate the path for the cache file used to store snapshots. 715 716 The cache file is a camel file that stores the timestamps of the snapshots. 717 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 718 719 Returns: 720 str: The path to the cache file. 721 """ 722 path = str(self.path()) 723 ext = self.ext() 724 ext_len = len(ext) 725 if path.endswith(f'.{ext}'): 726 path = path[:-ext_len-1] 727 _, filename = os.path.split(path + f'.snapshots.{ext}') 728 return self.base_path(filename)
Generate the path for the cache file used to store snapshots.
The cache file is a camel file that stores the timestamps of the snapshots. The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel".
Returns: str: The path to the cache file.
730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True
This function creates a snapshot of the current database state.
The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. If a snapshot with the same hash exists, the function returns True without creating a new snapshot. If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp.
Parameters: None
Returns: bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails.
762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result
Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status.
Parameters:
- hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True.
- verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False.
Returns:
- dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, and the values are tuples containing the snapshot's hash, path, and existence status.
794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True
Revert the last operation.
Parameters: dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. debug (bool): If True, the function will print debug information. Default is False.
Returns: bool: True if the operation was successful, False otherwise.
942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False
Check if a specific reference (transaction) exists in the vault for a given account and reference type.
Parameters: account (str): The account number for which to check the existence of the reference. ref_type (str): The type of reference (e.g., 'box', 'log', etc.). ref (int): The reference (transaction) number to check for existence.
Returns: bool: True if the reference exists for the given account and reference type, False otherwise.
958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref)
Check if a specific box (transaction) exists in the vault for a given account and reference.
Parameters:
- account (str): The account number for which to check the existence of the box.
- ref (int): The reference (transaction) number to check for existence.
Returns:
- bool: True if the box exists for the given account and reference, False otherwise.
971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created
This function tracks a transaction for a specific account.
Parameters: unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. desc (str): The description of the transaction. Default is an empty string. account (str): The account for which the transaction is being tracked. Default is '1'. logging (bool): Whether to log the transaction. Default is True. created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. debug (bool): Whether to print debug information. Default is False.
Returns: int: The timestamp of the transaction.
This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box.
Raises: ValueError: The log transaction happened again in the same nanosecond time. ValueError: The box transaction happened again in the same nanosecond time.
1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref)
Checks if a specific transaction log entry exists for a given account.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry.
Returns: bool: True if the transaction log entry exists, False otherwise.
1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ
This method is used to record or retrieve exchange rates for a specific account.
Parameters:
- account (str): The account number for which the exchange rate is being recorded or retrieved.
- created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used.
- rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate.
- description (str): A description of the exchange rate.
Returns:
- dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, it returns a dictionary with default values for the rate and description.
1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate
This function calculates the exchanged amount of a currency.
Args: x (float): The original amount of the currency. x_rate (float): The exchange rate of the original currency. y_rate (float): The exchange rate of the target currency.
Returns: float: The exchanged amount of the target currency.
1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy()
Retrieve the recorded exchange rates for all accounts.
Parameters: None
Returns: dict: A dictionary containing all recorded exchange rates. The keys are account names or numbers, and the values are dictionaries containing the exchange rates. Each exchange rate dictionary has timestamps as keys and exchange rate details as values.
1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result
Returns a dictionary containing account numbers as keys and their respective balances as values.
Parameters: None
Returns: dict: A dictionary where keys are account numbers and values are their respective balances.
1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {}
Retrieve the boxes (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the boxes.
Returns: dict: A dictionary containing the boxes associated with the given account. If the account does not exist, an empty dictionary is returned.
1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {}
Retrieve the logs (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the logs.
Returns: dict: A dictionary containing the logs associated with the given account. If the account does not exist, an empty dictionary is returned.
1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0
Adds a file reference to a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. path (str): The path of the file to be added.
Returns: int: The reference of the added file. If the account or transaction log entry does not exist, returns 0.
1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False
Removes a file reference from a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. file_ref (int): The reference of the file to be removed.
Returns: bool: True if the file reference is successfully removed, False otherwise.
1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1]
Calculate and return the balance of a specific account.
Parameters: account (str): The account number. Default is '1'. cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True.
Returns: int: The balance of the account.
Note: If cached is True, the function returns the cached balance. If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items.
1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False
Check or set the hide status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new hide status. If not provided, the function will return the current status.
Returns: bool: The current or updated hide status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.hide('account1') # Set the hide status of 'account1' to True
False
>>> tracker.hide('account1', True) # Set the hide status of 'account1' to True
True
>>> tracker.hide('account1') # Get the hide status of 'account1' by default
True
>>> tracker.hide('account1', False)
False
1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False
Check or set the zakatable status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new zakatable status. If not provided, the function will return the current status.
Returns: bool: The current or updated zakatable status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default
True
>>> tracker.zakatable('account1', False)
False
1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages
Subtracts a specified value from an account's balance.
Parameters: unscaled_value (float | int | Decimal): The amount to be subtracted. desc (str): A description for the transaction. Defaults to an empty string. account (str): The account from which the value will be subtracted. Defaults to '1'. created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction.
If the amount to subtract is greater than the account's balance, the remaining amount will be transferred to a new transaction with a negative value.
Raises: ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times
Transfers a specified value from one account to another.
Parameters: unscaled_amount (float | int | Decimal): The amount to be transferred. from_account (str): The account from which the value will be transferred. to_account (str): The account to which the value will be transferred. desc (str, optional): A description for the transaction. Defaults to an empty string. created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: list[int]: A list of timestamps corresponding to the transactions made during the transfer.
Raises: ValueError: Transfer to the same account is forbidden. ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan
Check the eligibility for Zakat based on the given parameters.
Parameters: silver_gram_price (float): The price of a gram of silver. unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, it will be calculated based on the silver_gram_price. debug (bool): Flag to enable debug mode. now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle().
Returns: tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, and a dictionary containing the Zakat plan.
1622 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 demand (float): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': float, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': demand, 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts
Build payment parts for the Zakat distribution.
Parameters: demand (float): The total demand for payment in local currency. positive_only (bool): If True, only consider accounts with positive balance. Default is True.
Returns: dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: { 'account': { 'account_id': {'balance': float, 'rate': float, 'part': float}, ... }, 'exceed': bool, 'demand': float, 'total': float, }
1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0
Checks the validity of payment parts.
Parameters: parts (dict): A dictionary containing payment parts information. debug (bool): Flag to enable debug mode.
Returns: int: Returns 0 if the payment parts are valid, otherwise returns the error code.
Error Codes: 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 3: 'part' value in parts['account'][x] is less than 0. 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 6: The sum of 'part' values in parts['account'] does not match with 'demand' value.
1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1779 if no_lock: 1780 self.free(self.lock()) 1781 return True
Perform Zakat calculation based on the given report and optional parts.
Parameters: report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. parts (dict): A dictionary containing the payment parts for the zakat. debug (bool): A flag indicating whether to print debug information.
Returns: bool: True if the zakat calculation is successful, False otherwise.
1783 def export_json(self, path: str = "data.json") -> bool: 1784 """ 1785 Exports the current state of the ZakatTracker object to a JSON file. 1786 1787 Parameters: 1788 path (str): The path where the JSON file will be saved. Default is "data.json". 1789 1790 Returns: 1791 bool: True if the export is successful, False otherwise. 1792 1793 Raises: 1794 No specific exceptions are raised by this method. 1795 """ 1796 with open(path, "w") as file: 1797 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1798 return True
Exports the current state of the ZakatTracker object to a JSON file.
Parameters: path (str): The path where the JSON file will be saved. Default is "data.json".
Returns: bool: True if the export is successful, False otherwise.
Raises: No specific exceptions are raised by this method.
1800 def save(self, path: str = None) -> bool: 1801 """ 1802 Saves the ZakatTracker's current state to a camel file. 1803 1804 This method serializes the internal data (`_vault`). 1805 1806 Parameters: 1807 path (str, optional): File path for saving. Defaults to a predefined location. 1808 1809 Returns: 1810 bool: True if the save operation is successful, False otherwise. 1811 """ 1812 if path is None: 1813 path = self.path() 1814 with open(f'{path}.tmp', 'w') as stream: 1815 # first save in tmp file 1816 stream.write(camel.dump(self._vault)) 1817 # then move tmp file to original location 1818 shutil.move(f'{path}.tmp', path) 1819 return True
Saves the ZakatTracker's current state to a camel file.
This method serializes the internal data (_vault
).
Parameters: path (str, optional): File path for saving. Defaults to a predefined location.
Returns: bool: True if the save operation is successful, False otherwise.
1821 def load(self, path: str = None) -> bool: 1822 """ 1823 Load the current state of the ZakatTracker object from a camel file. 1824 1825 Parameters: 1826 path (str): The path where the camel file is located. If not provided, it will use the default path. 1827 1828 Returns: 1829 bool: True if the load operation is successful, False otherwise. 1830 """ 1831 if path is None: 1832 path = self.path() 1833 if os.path.exists(path): 1834 with open(path, 'r') as stream: 1835 self._vault = camel.load(stream.read()) 1836 return True 1837 return False
Load the current state of the ZakatTracker object from a camel file.
Parameters: path (str): The path where the camel file is located. If not provided, it will use the default path.
Returns: bool: True if the load operation is successful, False otherwise.
1839 def import_csv_cache_path(self): 1840 """ 1841 Generates the cache file path for imported CSV data. 1842 1843 This function constructs the file path where cached data from CSV imports 1844 will be stored. The cache file is a camel file (.camel extension) appended 1845 to the base path of the object. 1846 1847 Returns: 1848 str: The full path to the import CSV cache file. 1849 1850 Example: 1851 >>> obj = ZakatTracker('/data/reports') 1852 >>> obj.import_csv_cache_path() 1853 '/data/reports.import_csv.camel' 1854 """ 1855 path = str(self.path()) 1856 ext = self.ext() 1857 ext_len = len(ext) 1858 if path.endswith(f'.{ext}'): 1859 path = path[:-ext_len-1] 1860 _, filename = os.path.split(path + f'.import_csv.{ext}') 1861 return self.base_path(filename)
Generates the cache file path for imported CSV data.
This function constructs the file path where cached data from CSV imports will be stored. The cache file is a camel file (.camel extension) appended to the base path of the object.
Returns: str: The full path to the import CSV cache file.
Example:
obj = ZakatTracker('/data/reports') obj.import_csv_cache_path() '/data/reports.import_csv.camel'
1863 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1864 """ 1865 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1866 1867 Parameters: 1868 path (str): The path to the CSV file. Default is 'file.csv'. 1869 debug (bool): A flag indicating whether to print debug information. 1870 1871 Returns: 1872 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1873 and a dictionary of bad transactions. 1874 1875 Notes: 1876 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1877 are appropriate for the currency pairs involved in the conversions. 1878 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1879 to 1.0 or the previous rate for that account. 1880 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1881 transactions of the same account within the whole imported and existing dataset when doing `check` and 1882 `zakat` operations. 1883 1884 Example Usage: 1885 The CSV file should have the following format, rate is optional per transaction: 1886 account, desc, value, date, rate 1887 For example: 1888 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1889 """ 1890 if debug: 1891 print('import_csv', f'debug={debug}') 1892 cache: list[int] = [] 1893 try: 1894 with open(self.import_csv_cache_path(), 'r') as stream: 1895 cache = camel.load(stream.read()) 1896 except: 1897 pass 1898 date_formats = [ 1899 "%Y-%m-%d %H:%M:%S", 1900 "%Y-%m-%dT%H:%M:%S", 1901 "%Y-%m-%dT%H%M%S", 1902 "%Y-%m-%d", 1903 ] 1904 created, found, bad = 0, 0, {} 1905 data: dict[int, list] = {} 1906 with open(path, newline='', encoding="utf-8") as f: 1907 i = 0 1908 for row in csv.reader(f, delimiter=','): 1909 i += 1 1910 hashed = hash(tuple(row)) 1911 if hashed in cache: 1912 found += 1 1913 continue 1914 account = row[0] 1915 desc = row[1] 1916 value = float(row[2]) 1917 rate = 1.0 1918 if row[4:5]: # Empty list if index is out of range 1919 rate = float(row[4]) 1920 date: int = 0 1921 for time_format in date_formats: 1922 try: 1923 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1924 break 1925 except: 1926 pass 1927 # TODO: not allowed for negative dates in the future after enhance time functions 1928 if date == 0 or value == 0: 1929 bad[i] = row + ('invalid date',) 1930 continue 1931 if date not in data: 1932 data[date] = [] 1933 data[date].append((i, account, desc, value, date, rate, hashed)) 1934 1935 if debug: 1936 print('import_csv', len(data)) 1937 1938 if bad: 1939 return created, found, bad 1940 1941 for date, rows in sorted(data.items()): 1942 try: 1943 len_rows = len(rows) 1944 if len_rows == 1: 1945 (_, account, desc, value, date, rate, hashed) = rows[0] 1946 if rate > 0: 1947 self.exchange(account, created=date, rate=rate) 1948 if value > 0: 1949 self.track(value, desc, account, True, date) 1950 elif value < 0: 1951 self.sub(-value, desc, account, date) 1952 created += 1 1953 cache.append(hashed) 1954 continue 1955 if debug: 1956 print('-- Duplicated time detected', date, 'len', len_rows) 1957 print(rows) 1958 print('---------------------------------') 1959 # If records are found at the same time with different accounts in the same amount 1960 # (one positive and the other negative), this indicates it is a transfer. 1961 if len_rows != 2: 1962 raise Exception(f'more than two transactions({len_rows}) at the same time') 1963 (i, account1, desc1, value1, date1, rate1, _) = rows[0] 1964 (j, account2, desc2, value2, date2, rate2, _) = rows[1] 1965 if account1 == account2 or desc1 != desc2 or abs(value1) != abs(value2) or date1 != date2: 1966 raise Exception('invalid transfer') 1967 if rate1 > 0: 1968 self.exchange(account1, created=date1, rate=rate1) 1969 if rate2 > 0: 1970 self.exchange(account2, created=date2, rate=rate2) 1971 values = { 1972 value1: account1, 1973 value2: account2, 1974 } 1975 self.transfer( 1976 unscaled_amount=abs(value1), 1977 from_account=values[min(values.keys())], 1978 to_account=values[max(values.keys())], 1979 desc=desc1, 1980 created=date1, 1981 ) 1982 except Exception as e: 1983 for (i, account, desc, value, date, rate, _) in rows: 1984 bad[i] = (account, desc, value, date, rate, e) 1985 break 1986 with open(self.import_csv_cache_path(), 'w') as stream: 1987 stream.write(camel.dump(cache)) 1988 return created, found, bad
The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system.
Parameters: path (str): The path to the CSV file. Default is 'file.csv'. debug (bool): A flag indicating whether to print debug information.
Returns: tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, and a dictionary of bad transactions.
Notes:
* Currency Pair Assumption: This function assumes that the exchange rates stored for each account
are appropriate for the currency pairs involved in the conversions.
* The exchange rate for each account is based on the last encountered transaction rate that is not equal
to 1.0 or the previous rate for that account.
* Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent
transactions of the same account within the whole imported and existing dataset when doing check
and
zakat
operations.
Example Usage: The CSV file should have the following format, rate is optional per transaction: account, desc, value, date, rate For example: safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1
1994 @staticmethod 1995 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1996 """ 1997 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 1998 1999 This function iterates through progressively larger units of information 2000 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2001 range that can be expressed with a reasonable number before the unit. 2002 2003 Parameters: 2004 size (float): The size in bytes to convert. 2005 decimal_places (int, optional): The number of decimal places to display 2006 in the result. Defaults to 2. 2007 2008 Returns: 2009 str: A string representation of the size in a human-readable format, 2010 rounded to the specified number of decimal places. For example: 2011 - "1.50 KB" (1536 bytes) 2012 - "23.00 MB" (24117248 bytes) 2013 - "1.23 GB" (1325899906 bytes) 2014 """ 2015 if type(size) not in (float, int): 2016 raise TypeError("size must be a float or integer") 2017 if type(decimal_places) != int: 2018 raise TypeError("decimal_places must be an integer") 2019 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2020 if size < 1024.0: 2021 break 2022 size /= 1024.0 2023 return f"{size:.{decimal_places}f} {unit}"
Converts a size in bytes to a human-readable format (e.g., KB, MB, GB).
This function iterates through progressively larger units of information (B, KB, MB, GB, etc.) and divides the input size until it fits within a range that can be expressed with a reasonable number before the unit.
Parameters: size (float): The size in bytes to convert. decimal_places (int, optional): The number of decimal places to display in the result. Defaults to 2.
Returns: str: A string representation of the size in a human-readable format, rounded to the specified number of decimal places. For example: - "1.50 KB" (1536 bytes) - "23.00 MB" (24117248 bytes) - "1.23 GB" (1325899906 bytes)
2025 @staticmethod 2026 def get_dict_size(obj: dict, seen: set = None) -> float: 2027 """ 2028 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2029 2030 This function traverses the dictionary structure, accounting for the size of keys, values, 2031 and any nested objects. It handles various data types commonly found in dictionaries 2032 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2033 of circular references. 2034 2035 Parameters: 2036 obj (dict): The dictionary whose size is to be calculated. 2037 seen (set, optional): A set used internally to track visited objects 2038 and avoid circular references. Defaults to None. 2039 2040 Returns: 2041 float: An approximate size of the dictionary and its contents in bytes. 2042 2043 Note: 2044 - This function is a method of the `ZakatTracker` class and is likely used to 2045 estimate the memory footprint of data structures relevant to Zakat calculations. 2046 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2047 not account for all memory overhead depending on the Python implementation. 2048 - Circular references are handled to prevent infinite recursion. 2049 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2050 - String sizes are estimated based on character length and encoding. 2051 """ 2052 size = 0 2053 if seen is None: 2054 seen = set() 2055 2056 obj_id = id(obj) 2057 if obj_id in seen: 2058 return 0 2059 2060 seen.add(obj_id) 2061 size += sys.getsizeof(obj) 2062 2063 if isinstance(obj, dict): 2064 for k, v in obj.items(): 2065 size += ZakatTracker.get_dict_size(k, seen) 2066 size += ZakatTracker.get_dict_size(v, seen) 2067 elif isinstance(obj, (list, tuple, set, frozenset)): 2068 for item in obj: 2069 size += ZakatTracker.get_dict_size(item, seen) 2070 elif isinstance(obj, (int, float, complex)): # Handle numbers 2071 pass # Basic numbers have a fixed size, so nothing to add here 2072 elif isinstance(obj, str): # Handle strings 2073 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2074 return size
Recursively calculates the approximate memory size of a dictionary and its contents in bytes.
This function traverses the dictionary structure, accounting for the size of keys, values, and any nested objects. It handles various data types commonly found in dictionaries (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case of circular references.
Parameters: obj (dict): The dictionary whose size is to be calculated. seen (set, optional): A set used internally to track visited objects and avoid circular references. Defaults to None.
Returns: float: An approximate size of the dictionary and its contents in bytes.
Note:
- This function is a method of the
ZakatTracker
class and is likely used to estimate the memory footprint of data structures relevant to Zakat calculations. - The size calculation is approximate as it relies on
sys.getsizeof()
, which might not account for all memory overhead depending on the Python implementation. - Circular references are handled to prevent infinite recursion.
- Basic numeric types (int, float, complex) are assumed to have fixed sizes.
- String sizes are estimated based on character length and encoding.
2076 @staticmethod 2077 def duration_from_nanoseconds(ns: int, 2078 show_zeros_in_spoken_time: bool = False, 2079 spoken_time_separator=',', 2080 millennia: str = 'Millennia', 2081 century: str = 'Century', 2082 years: str = 'Years', 2083 days: str = 'Days', 2084 hours: str = 'Hours', 2085 minutes: str = 'Minutes', 2086 seconds: str = 'Seconds', 2087 milli_seconds: str = 'MilliSeconds', 2088 micro_seconds: str = 'MicroSeconds', 2089 nano_seconds: str = 'NanoSeconds', 2090 ) -> tuple: 2091 """ 2092 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2093 Convert NanoSeconds to Human Readable Time Format. 2094 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2095 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2096 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2097 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2098 2099 INPUT : ms (AKA: MilliSeconds) 2100 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2101 OUTPUT Variables: time_lapsed, spoken_time 2102 2103 Example Input: duration_from_nanoseconds(ns) 2104 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2105 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2106 duration_from_nanoseconds(1234567890123456789012) 2107 """ 2108 us, ns = divmod(ns, 1000) 2109 ms, us = divmod(us, 1000) 2110 s, ms = divmod(ms, 1000) 2111 m, s = divmod(s, 60) 2112 h, m = divmod(m, 60) 2113 d, h = divmod(h, 24) 2114 y, d = divmod(d, 365) 2115 c, y = divmod(y, 100) 2116 n, c = divmod(c, 10) 2117 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2118 spoken_time_part = [] 2119 if n > 0 or show_zeros_in_spoken_time: 2120 spoken_time_part.append(f"{n: 3d} {millennia}") 2121 if c > 0 or show_zeros_in_spoken_time: 2122 spoken_time_part.append(f"{c: 4d} {century}") 2123 if y > 0 or show_zeros_in_spoken_time: 2124 spoken_time_part.append(f"{y: 3d} {years}") 2125 if d > 0 or show_zeros_in_spoken_time: 2126 spoken_time_part.append(f"{d: 4d} {days}") 2127 if h > 0 or show_zeros_in_spoken_time: 2128 spoken_time_part.append(f"{h: 2d} {hours}") 2129 if m > 0 or show_zeros_in_spoken_time: 2130 spoken_time_part.append(f"{m: 2d} {minutes}") 2131 if s > 0 or show_zeros_in_spoken_time: 2132 spoken_time_part.append(f"{s: 2d} {seconds}") 2133 if ms > 0 or show_zeros_in_spoken_time: 2134 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2135 if us > 0 or show_zeros_in_spoken_time: 2136 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2137 if ns > 0 or show_zeros_in_spoken_time: 2138 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2139 return time_lapsed, spoken_time_separator.join(spoken_time_part)
REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 Convert NanoSeconds to Human Readable Time Format. A NanoSeconds is a unit of time in the International System of Units (SI) equal to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. Its symbol is μs, sometimes simplified to us when Unicode is not available. A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond.
INPUT : ms (AKA: MilliSeconds) OUTPUT: tuple(string time_lapsed, string spoken_time) like format. OUTPUT Variables: time_lapsed, spoken_time
Example Input: duration_from_nanoseconds(ns) "Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds" Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') duration_from_nanoseconds(1234567890123456789012)
2141 @staticmethod 2142 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2143 """ 2144 Convert a specific day, month, and year into a timestamp. 2145 2146 Parameters: 2147 day (int): The day of the month. 2148 month (int): The month of the year. Default is 6 (June). 2149 year (int): The year. Default is 2024. 2150 2151 Returns: 2152 int: The timestamp representing the given day, month, and year. 2153 2154 Note: 2155 This method assumes the default month and year if not provided. 2156 """ 2157 return ZakatTracker.time(datetime.datetime(year, month, day))
Convert a specific day, month, and year into a timestamp.
Parameters: day (int): The day of the month. month (int): The month of the year. Default is 6 (June). year (int): The year. Default is 2024.
Returns: int: The timestamp representing the given day, month, and year.
Note: This method assumes the default month and year if not provided.
2159 @staticmethod 2160 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2161 """ 2162 Generate a random date between two given dates. 2163 2164 Parameters: 2165 start_date (datetime.datetime): The start date from which to generate a random date. 2166 end_date (datetime.datetime): The end date until which to generate a random date. 2167 2168 Returns: 2169 datetime.datetime: A random date between the start_date and end_date. 2170 """ 2171 time_between_dates = end_date - start_date 2172 days_between_dates = time_between_dates.days 2173 random_number_of_days = random.randrange(days_between_dates) 2174 return start_date + datetime.timedelta(days=random_number_of_days)
Generate a random date between two given dates.
Parameters: start_date (datetime.datetime): The start date from which to generate a random date. end_date (datetime.datetime): The end date until which to generate a random date.
Returns: datetime.datetime: A random date between the start_date and end_date.
2176 @staticmethod 2177 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2178 debug: bool = False) -> int: 2179 """ 2180 Generate a random CSV file with specified parameters. 2181 2182 Parameters: 2183 path (str): The path where the CSV file will be saved. Default is "data.csv". 2184 count (int): The number of rows to generate in the CSV file. Default is 1000. 2185 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2186 debug (bool): A flag indicating whether to print debug information. 2187 2188 Returns: 2189 None. The function generates a CSV file at the specified path with the given count of rows. 2190 Each row contains a randomly generated account, description, value, and date. 2191 The value is randomly generated between 1000 and 100000, 2192 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2193 If the row number is not divisible by 13, the value is multiplied by -1. 2194 """ 2195 if debug: 2196 print('generate_random_csv_file', f'debug={debug}') 2197 i = 0 2198 with open(path, "w", newline="") as csvfile: 2199 writer = csv.writer(csvfile) 2200 for i in range(count): 2201 account = f"acc-{random.randint(1, 1000)}" 2202 desc = f"Some text {random.randint(1, 1000)}" 2203 value = random.randint(1000, 100000) 2204 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2205 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2206 if not i % 13 == 0: 2207 value *= -1 2208 row = [account, desc, value, date] 2209 if with_rate: 2210 rate = random.randint(1, 100) * 0.12 2211 if debug: 2212 print('before-append', row) 2213 row.append(rate) 2214 if debug: 2215 print('after-append', row) 2216 writer.writerow(row) 2217 i = i + 1 2218 return i
Generate a random CSV file with specified parameters.
Parameters: path (str): The path where the CSV file will be saved. Default is "data.csv". count (int): The number of rows to generate in the CSV file. Default is 1000. with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. debug (bool): A flag indicating whether to print debug information.
Returns: None. The function generates a CSV file at the specified path with the given count of rows. Each row contains a randomly generated account, description, value, and date. The value is randomly generated between 1000 and 100000, and the date is randomly generated between 1950-01-01 and 2023-12-31. If the row number is not divisible by 13, the value is multiplied by -1.
2220 @staticmethod 2221 def create_random_list(max_sum, min_value=0, max_value=10): 2222 """ 2223 Creates a list of random integers whose sum does not exceed the specified maximum. 2224 2225 Args: 2226 max_sum: The maximum allowed sum of the list elements. 2227 min_value: The minimum possible value for an element (inclusive). 2228 max_value: The maximum possible value for an element (inclusive). 2229 2230 Returns: 2231 A list of random integers. 2232 """ 2233 result = [] 2234 current_sum = 0 2235 2236 while current_sum < max_sum: 2237 # Calculate the remaining space for the next element 2238 remaining_sum = max_sum - current_sum 2239 # Determine the maximum possible value for the next element 2240 next_max_value = min(remaining_sum, max_value) 2241 # Generate a random element within the allowed range 2242 next_element = random.randint(min_value, next_max_value) 2243 result.append(next_element) 2244 current_sum += next_element 2245 2246 return result
Creates a list of random integers whose sum does not exceed the specified maximum.
Args: max_sum: The maximum allowed sum of the list elements. min_value: The minimum possible value for an element (inclusive). max_value: The maximum possible value for an element (inclusive).
Returns: A list of random integers.
2477 def test(self, debug: bool = False) -> bool: 2478 if debug: 2479 print('test', f'debug={debug}') 2480 try: 2481 2482 self._test_core(True, debug) 2483 self._test_core(False, debug) 2484 2485 assert self._history() 2486 2487 # Not allowed for duplicate transactions in the same account and time 2488 2489 created = ZakatTracker.time() 2490 self.track(100, 'test-1', 'same', True, created) 2491 failed = False 2492 try: 2493 self.track(50, 'test-1', 'same', True, created) 2494 except: 2495 failed = True 2496 assert failed is True 2497 2498 self.reset() 2499 2500 # Same account transfer 2501 for x in [1, 'a', True, 1.8, None]: 2502 failed = False 2503 try: 2504 self.transfer(1, x, x, 'same-account', debug=debug) 2505 except: 2506 failed = True 2507 assert failed is True 2508 2509 # Always preserve box age during transfer 2510 2511 series: list[tuple] = [ 2512 (30, 4), 2513 (60, 3), 2514 (90, 2), 2515 ] 2516 case = { 2517 3000: { 2518 'series': series, 2519 'rest': 15000, 2520 }, 2521 6000: { 2522 'series': series, 2523 'rest': 12000, 2524 }, 2525 9000: { 2526 'series': series, 2527 'rest': 9000, 2528 }, 2529 18000: { 2530 'series': series, 2531 'rest': 0, 2532 }, 2533 27000: { 2534 'series': series, 2535 'rest': -9000, 2536 }, 2537 36000: { 2538 'series': series, 2539 'rest': -18000, 2540 }, 2541 } 2542 2543 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2544 2545 for total in case: 2546 if debug: 2547 print('--------------------------------------------------------') 2548 print(f'case[{total}]', case[total]) 2549 for x in case[total]['series']: 2550 self.track( 2551 unscaled_value=x[0], 2552 desc=f"test-{x} ages", 2553 account='ages', 2554 logging=True, 2555 created=selected_time * x[1], 2556 ) 2557 2558 unscaled_total = self.unscale(total) 2559 if debug: 2560 print('unscaled_total', unscaled_total) 2561 refs = self.transfer( 2562 unscaled_amount=unscaled_total, 2563 from_account='ages', 2564 to_account='future', 2565 desc='Zakat Movement', 2566 debug=debug, 2567 ) 2568 2569 if debug: 2570 print('refs', refs) 2571 2572 ages_cache_balance = self.balance('ages') 2573 ages_fresh_balance = self.balance('ages', False) 2574 rest = case[total]['rest'] 2575 if debug: 2576 print('source', ages_cache_balance, ages_fresh_balance, rest) 2577 assert ages_cache_balance == rest 2578 assert ages_fresh_balance == rest 2579 2580 future_cache_balance = self.balance('future') 2581 future_fresh_balance = self.balance('future', False) 2582 if debug: 2583 print('target', future_cache_balance, future_fresh_balance, total) 2584 print('refs', refs) 2585 assert future_cache_balance == total 2586 assert future_fresh_balance == total 2587 2588 # TODO: check boxes times for `ages` should equal box times in `future` 2589 for ref in self._vault['account']['ages']['box']: 2590 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2591 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2592 future_capital = 0 2593 if ref in self._vault['account']['future']['box']: 2594 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2595 future_rest = 0 2596 if ref in self._vault['account']['future']['box']: 2597 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2598 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2599 if debug: 2600 print('================================================================') 2601 print('ages', ages_capital, ages_rest) 2602 print('future', future_capital, future_rest) 2603 if ages_rest == 0: 2604 assert ages_capital == future_capital 2605 elif ages_rest < 0: 2606 assert -ages_capital == future_capital 2607 elif ages_rest > 0: 2608 assert ages_capital == ages_rest + future_capital 2609 self.reset() 2610 assert len(self._vault['history']) == 0 2611 2612 assert self._history() 2613 assert self._history(False) is False 2614 assert self._history() is False 2615 assert self._history(True) 2616 assert self._history() 2617 if debug: 2618 print('####################################################################') 2619 2620 transaction = [ 2621 ( 2622 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2623 2000, 2000, 2000, 1, 1, 2624 ), 2625 ( 2626 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2627 75000, 75000, 75000, 1, 1, 2628 ), 2629 ( 2630 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2631 60000, 60000, 60000, 1, 1, 2632 ), 2633 ] 2634 for z in transaction: 2635 self.lock() 2636 x = z[1] 2637 y = z[2] 2638 self.transfer( 2639 unscaled_amount=z[0], 2640 from_account=x, 2641 to_account=y, 2642 desc='test-transfer', 2643 debug=debug, 2644 ) 2645 zz = self.balance(x) 2646 if debug: 2647 print(zz, z) 2648 assert zz == z[3] 2649 xx = self.accounts()[x] 2650 assert xx == z[3] 2651 assert self.balance(x, False) == z[4] 2652 assert xx == z[4] 2653 2654 s = 0 2655 log = self._vault['account'][x]['log'] 2656 for i in log: 2657 s += log[i]['value'] 2658 if debug: 2659 print('s', s, 'z[5]', z[5]) 2660 assert s == z[5] 2661 2662 assert self.box_size(x) == z[6] 2663 assert self.log_size(x) == z[7] 2664 2665 yy = self.accounts()[y] 2666 assert self.balance(y) == z[8] 2667 assert yy == z[8] 2668 assert self.balance(y, False) == z[9] 2669 assert yy == z[9] 2670 2671 s = 0 2672 log = self._vault['account'][y]['log'] 2673 for i in log: 2674 s += log[i]['value'] 2675 assert s == z[10] 2676 2677 assert self.box_size(y) == z[11] 2678 assert self.log_size(y) == z[12] 2679 assert self.free(self.lock()) 2680 2681 if debug: 2682 pp().pprint(self.check(2.17)) 2683 2684 assert not self.nolock() 2685 history_count = len(self._vault['history']) 2686 if debug: 2687 print('history-count', history_count) 2688 assert history_count == 4 2689 assert not self.free(ZakatTracker.time()) 2690 assert self.free(self.lock()) 2691 assert self.nolock() 2692 assert len(self._vault['history']) == 3 2693 2694 # storage 2695 2696 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 2697 if os.path.exists(_path): 2698 os.remove(_path) 2699 self.save() 2700 assert os.path.getsize(_path) > 0 2701 self.reset() 2702 assert self.recall(False, debug) is False 2703 self.load() 2704 assert self._vault['account'] is not None 2705 2706 # recall 2707 2708 assert self.nolock() 2709 assert len(self._vault['history']) == 3 2710 assert self.recall(False, debug) is True 2711 assert len(self._vault['history']) == 2 2712 assert self.recall(False, debug) is True 2713 assert len(self._vault['history']) == 1 2714 assert self.recall(False, debug) is True 2715 assert len(self._vault['history']) == 0 2716 assert self.recall(False, debug) is False 2717 assert len(self._vault['history']) == 0 2718 2719 # exchange 2720 2721 self.exchange("cash", 25, 3.75, "2024-06-25") 2722 self.exchange("cash", 22, 3.73, "2024-06-22") 2723 self.exchange("cash", 15, 3.69, "2024-06-15") 2724 self.exchange("cash", 10, 3.66) 2725 2726 for i in range(1, 30): 2727 exchange = self.exchange("cash", i) 2728 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2729 if debug: 2730 print(i, rate, description, created) 2731 assert created 2732 if i < 10: 2733 assert rate == 1 2734 assert description is None 2735 elif i == 10: 2736 assert rate == 3.66 2737 assert description is None 2738 elif i < 15: 2739 assert rate == 3.66 2740 assert description is None 2741 elif i == 15: 2742 assert rate == 3.69 2743 assert description is not None 2744 elif i < 22: 2745 assert rate == 3.69 2746 assert description is not None 2747 elif i == 22: 2748 assert rate == 3.73 2749 assert description is not None 2750 elif i >= 25: 2751 assert rate == 3.75 2752 assert description is not None 2753 exchange = self.exchange("bank", i) 2754 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2755 if debug: 2756 print(i, rate, description, created) 2757 assert created 2758 assert rate == 1 2759 assert description is None 2760 2761 assert len(self._vault['exchange']) > 0 2762 assert len(self.exchanges()) > 0 2763 self._vault['exchange'].clear() 2764 assert len(self._vault['exchange']) == 0 2765 assert len(self.exchanges()) == 0 2766 2767 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2768 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2769 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2770 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2771 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2772 2773 for i in [x * 0.12 for x in range(-15, 21)]: 2774 if i <= 0: 2775 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2776 else: 2777 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2778 2779 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2780 for i in range(1, 31): 2781 timestamp_ns = ZakatTracker.day_to_time(i) 2782 exchange = self.exchange("cash", timestamp_ns) 2783 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2784 if debug: 2785 print(i, rate, description, created) 2786 assert created 2787 if i < 10: 2788 assert rate == 1 2789 assert description is None 2790 elif i == 10: 2791 assert rate == 3.66 2792 assert description is None 2793 elif i < 15: 2794 assert rate == 3.66 2795 assert description is None 2796 elif i == 15: 2797 assert rate == 3.69 2798 assert description is not None 2799 elif i < 22: 2800 assert rate == 3.69 2801 assert description is not None 2802 elif i == 22: 2803 assert rate == 3.73 2804 assert description is not None 2805 elif i >= 25: 2806 assert rate == 3.75 2807 assert description is not None 2808 exchange = self.exchange("bank", i) 2809 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2810 if debug: 2811 print(i, rate, description, created) 2812 assert created 2813 assert rate == 1 2814 assert description is None 2815 2816 # csv 2817 2818 csv_count = 1000 2819 2820 for with_rate, path in { 2821 False: 'test-import_csv-no-exchange', 2822 True: 'test-import_csv-with-exchange', 2823 }.items(): 2824 2825 if debug: 2826 print('test_import_csv', with_rate, path) 2827 2828 csv_path = path + '.csv' 2829 if os.path.exists(csv_path): 2830 os.remove(csv_path) 2831 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2832 if debug: 2833 print('generate_random_csv_file', c) 2834 assert c == csv_count 2835 assert os.path.getsize(csv_path) > 0 2836 cache_path = self.import_csv_cache_path() 2837 if os.path.exists(cache_path): 2838 os.remove(cache_path) 2839 self.reset() 2840 (created, found, bad) = self.import_csv(csv_path, debug) 2841 bad_count = len(bad) 2842 assert bad_count > 0 2843 if debug: 2844 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2845 print('bad', bad) 2846 tmp_size = os.path.getsize(cache_path) 2847 assert tmp_size > 0 2848 # TODO: assert created + found + bad_count == csv_count 2849 # TODO: assert created == csv_count 2850 # TODO: assert bad_count == 0 2851 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2852 bad_2_count = len(bad_2) 2853 if debug: 2854 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2855 print('bad', bad) 2856 assert bad_2_count > 0 2857 # TODO: assert tmp_size == os.path.getsize(cache_path) 2858 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2859 # TODO: assert created == found_2 2860 # TODO: assert bad_count == bad_2_count 2861 # TODO: assert found_2 == csv_count 2862 # TODO: assert bad_2_count == 0 2863 # TODO: assert created_2 == 0 2864 2865 # payment parts 2866 2867 positive_parts = self.build_payment_parts(100, positive_only=True) 2868 assert self.check_payment_parts(positive_parts) != 0 2869 assert self.check_payment_parts(positive_parts) != 0 2870 all_parts = self.build_payment_parts(300, positive_only=False) 2871 assert self.check_payment_parts(all_parts) != 0 2872 assert self.check_payment_parts(all_parts) != 0 2873 if debug: 2874 pp().pprint(positive_parts) 2875 pp().pprint(all_parts) 2876 # dynamic discount 2877 suite = [] 2878 count = 3 2879 for exceed in [False, True]: 2880 case = [] 2881 for parts in [positive_parts, all_parts]: 2882 part = parts.copy() 2883 demand = part['demand'] 2884 if debug: 2885 print(demand, part['total']) 2886 i = 0 2887 z = demand / count 2888 cp = { 2889 'account': {}, 2890 'demand': demand, 2891 'exceed': exceed, 2892 'total': part['total'], 2893 } 2894 j = '' 2895 for x, y in part['account'].items(): 2896 x_exchange = self.exchange(x) 2897 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2898 if exceed and zz <= demand: 2899 i += 1 2900 y['part'] = zz 2901 if debug: 2902 print(exceed, y) 2903 cp['account'][x] = y 2904 case.append(y) 2905 elif not exceed and y['balance'] >= zz: 2906 i += 1 2907 y['part'] = zz 2908 if debug: 2909 print(exceed, y) 2910 cp['account'][x] = y 2911 case.append(y) 2912 j = x 2913 if i >= count: 2914 break 2915 if len(cp['account'][j]) > 0: 2916 suite.append(cp) 2917 if debug: 2918 print('suite', len(suite)) 2919 # vault = self._vault.copy() 2920 for case in suite: 2921 # self._vault = vault.copy() 2922 if debug: 2923 print('case', case) 2924 result = self.check_payment_parts(case) 2925 if debug: 2926 print('check_payment_parts', result, f'exceed: {exceed}') 2927 assert result == 0 2928 2929 report = self.check(2.17, None, debug) 2930 (valid, brief, plan) = report 2931 if debug: 2932 print('valid', valid) 2933 zakat_result = self.zakat(report, parts=case, debug=debug) 2934 if debug: 2935 print('zakat-result', zakat_result) 2936 assert valid == zakat_result 2937 2938 assert self.save(path + f'.{self.ext()}') 2939 assert self.export_json(path + '.json') 2940 2941 assert self.export_json("1000-transactions-test.json") 2942 assert self.save(f"1000-transactions-test.{self.ext()}") 2943 2944 self.reset() 2945 2946 # test transfer between accounts with different exchange rate 2947 2948 a_SAR = "Bank (SAR)" 2949 b_USD = "Bank (USD)" 2950 c_SAR = "Safe (SAR)" 2951 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2952 for case in [ 2953 (0, a_SAR, "SAR Gift", 1000, 100000), 2954 (1, a_SAR, 1), 2955 (0, b_USD, "USD Gift", 500, 50000), 2956 (1, b_USD, 1), 2957 (2, b_USD, 3.75), 2958 (1, b_USD, 3.75), 2959 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2960 (0, c_SAR, "Salary", 750, 75000), 2961 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2962 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2963 ]: 2964 if debug: 2965 print('case', case) 2966 match (case[0]): 2967 case 0: # track 2968 _, account, desc, x, balance = case 2969 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2970 2971 cached_value = self.balance(account, cached=True) 2972 fresh_value = self.balance(account, cached=False) 2973 if debug: 2974 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2975 assert cached_value == balance 2976 assert fresh_value == balance 2977 case 1: # check-exchange 2978 _, account, expected_rate = case 2979 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2980 if debug: 2981 print('t-exchange', t_exchange) 2982 assert t_exchange['rate'] == expected_rate 2983 case 2: # do-exchange 2984 _, account, rate = case 2985 self.exchange(account, rate=rate, debug=debug) 2986 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2987 if debug: 2988 print('b-exchange', b_exchange) 2989 assert b_exchange['rate'] == rate 2990 case 3: # transfer 2991 _, x, a, b, desc, a_balance, b_balance = case 2992 self.transfer(x, a, b, desc, debug=debug) 2993 2994 cached_value = self.balance(a, cached=True) 2995 fresh_value = self.balance(a, cached=False) 2996 if debug: 2997 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 2998 assert cached_value == a_balance 2999 assert fresh_value == a_balance 3000 3001 cached_value = self.balance(b, cached=True) 3002 fresh_value = self.balance(b, cached=False) 3003 if debug: 3004 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3005 assert cached_value == b_balance 3006 assert fresh_value == b_balance 3007 3008 # Transfer all in many chunks randomly from B to A 3009 a_SAR_balance = 137125 3010 b_USD_balance = 50100 3011 b_USD_exchange = self.exchange(b_USD) 3012 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3013 if debug: 3014 print('amounts', amounts) 3015 i = 0 3016 for x in amounts: 3017 if debug: 3018 print(f'{i} - transfer-with-exchange({x})') 3019 self.transfer( 3020 unscaled_amount=self.unscale(x), 3021 from_account=b_USD, 3022 to_account=a_SAR, 3023 desc=f"{x} USD -> SAR", 3024 debug=debug, 3025 ) 3026 3027 b_USD_balance -= x 3028 cached_value = self.balance(b_USD, cached=True) 3029 fresh_value = self.balance(b_USD, cached=False) 3030 if debug: 3031 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3032 b_USD_balance) 3033 assert cached_value == b_USD_balance 3034 assert fresh_value == b_USD_balance 3035 3036 a_SAR_balance += int(x * b_USD_exchange['rate']) 3037 cached_value = self.balance(a_SAR, cached=True) 3038 fresh_value = self.balance(a_SAR, cached=False) 3039 if debug: 3040 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3041 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3042 assert cached_value == a_SAR_balance 3043 assert fresh_value == a_SAR_balance 3044 i += 1 3045 3046 # Transfer all in many chunks randomly from C to A 3047 c_SAR_balance = 37500 3048 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3049 if debug: 3050 print('amounts', amounts) 3051 i = 0 3052 for x in amounts: 3053 if debug: 3054 print(f'{i} - transfer-with-exchange({x})') 3055 self.transfer( 3056 unscaled_amount=self.unscale(x), 3057 from_account=c_SAR, 3058 to_account=a_SAR, 3059 desc=f"{x} SAR -> a_SAR", 3060 debug=debug, 3061 ) 3062 3063 c_SAR_balance -= x 3064 cached_value = self.balance(c_SAR, cached=True) 3065 fresh_value = self.balance(c_SAR, cached=False) 3066 if debug: 3067 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3068 c_SAR_balance) 3069 assert cached_value == c_SAR_balance 3070 assert fresh_value == c_SAR_balance 3071 3072 a_SAR_balance += x 3073 cached_value = self.balance(a_SAR, cached=True) 3074 fresh_value = self.balance(a_SAR, cached=False) 3075 if debug: 3076 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3077 a_SAR_balance) 3078 assert cached_value == a_SAR_balance 3079 assert fresh_value == a_SAR_balance 3080 i += 1 3081 3082 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3083 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3084 3085 # check & zakat with exchange rates for many cycles 3086 3087 for rate, values in { 3088 1: { 3089 'in': [1000, 2000, 10000], 3090 'exchanged': [100000, 200000, 1000000], 3091 'out': [2500, 5000, 73140], 3092 }, 3093 3.75: { 3094 'in': [200, 1000, 5000], 3095 'exchanged': [75000, 375000, 1875000], 3096 'out': [1875, 9375, 137138], 3097 }, 3098 }.items(): 3099 a, b, c = values['in'] 3100 m, n, o = values['exchanged'] 3101 x, y, z = values['out'] 3102 if debug: 3103 print('rate', rate, 'values', values) 3104 for case in [ 3105 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3106 {'safe': {0: {'below_nisab': x}}}, 3107 ], False, m), 3108 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3109 {'safe': {0: {'count': 1, 'total': y}}}, 3110 ], True, n), 3111 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3112 {'cave': {0: {'count': 3, 'total': z}}}, 3113 ], True, o), 3114 ]: 3115 if debug: 3116 print(f"############# check(rate: {rate}) #############") 3117 print('case', case) 3118 self.reset() 3119 self.exchange(account=case[1], created=case[2], rate=rate) 3120 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3121 assert self.snapshot() 3122 3123 # assert self.nolock() 3124 # history_size = len(self._vault['history']) 3125 # print('history_size', history_size) 3126 # assert history_size == 2 3127 assert self.lock() 3128 assert not self.nolock() 3129 report = self.check(2.17, None, debug) 3130 (valid, brief, plan) = report 3131 if debug: 3132 print('brief', brief) 3133 assert valid == case[4] 3134 assert case[5] == brief[0] 3135 assert case[5] == brief[1] 3136 3137 if debug: 3138 pp().pprint(plan) 3139 3140 for x in plan: 3141 assert case[1] == x 3142 if 'total' in case[3][0][x][0].keys(): 3143 assert case[3][0][x][0]['total'] == int(brief[2]) 3144 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3145 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3146 else: 3147 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3148 if debug: 3149 pp().pprint(report) 3150 result = self.zakat(report, debug=debug) 3151 if debug: 3152 print('zakat-result', result, case[4]) 3153 assert result == case[4] 3154 report = self.check(2.17, None, debug) 3155 (valid, brief, plan) = report 3156 assert valid is False 3157 3158 history_size = len(self._vault['history']) 3159 if debug: 3160 print('history_size', history_size) 3161 assert history_size == 3 3162 assert not self.nolock() 3163 assert self.recall(False, debug) is False 3164 self.free(self.lock()) 3165 assert self.nolock() 3166 3167 for i in range(3, 0, -1): 3168 history_size = len(self._vault['history']) 3169 if debug: 3170 print('history_size', history_size) 3171 assert history_size == i 3172 assert self.recall(False, debug) is True 3173 3174 assert self.nolock() 3175 assert self.recall(False, debug) is False 3176 3177 history_size = len(self._vault['history']) 3178 if debug: 3179 print('history_size', history_size) 3180 assert history_size == 0 3181 3182 account_size = len(self._vault['account']) 3183 if debug: 3184 print('account_size', account_size) 3185 assert account_size == 0 3186 3187 report_size = len(self._vault['report']) 3188 if debug: 3189 print('report_size', report_size) 3190 assert report_size == 0 3191 3192 assert self.nolock() 3193 return True 3194 except Exception as e: 3195 # pp().pprint(self._vault) 3196 assert self.export_json("test-snapshot.json") 3197 assert self.save(f"test-snapshot.{self.ext()}") 3198 raise e
3201def test(debug: bool = False): 3202 ledger = ZakatTracker("./zakat_test_db/zakat.camel") 3203 start = ZakatTracker.time() 3204 assert ledger.test(debug=debug) 3205 if debug: 3206 print("#########################") 3207 print("######## TEST DONE ########") 3208 print("#########################") 3209 print(ZakatTracker.duration_from_nanoseconds(ZakatTracker.time() - start)) 3210 print("#########################")
76class Action(Enum): 77 CREATE = auto() 78 TRACK = auto() 79 LOG = auto() 80 SUB = auto() 81 ADD_FILE = auto() 82 REMOVE_FILE = auto() 83 BOX_TRANSFER = auto() 84 EXCHANGE = auto() 85 REPORT = auto() 86 ZAKAT = auto()
89class JSONEncoder(json.JSONEncoder): 90 def default(self, obj): 91 if isinstance(obj, Action) or isinstance(obj, MathOperation): 92 return obj.name # Serialize as the enum member's name 93 elif isinstance(obj, Decimal): 94 return float(obj) 95 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
90 def default(self, obj): 91 if isinstance(obj, Action) or isinstance(obj, MathOperation): 92 return obj.name # Serialize as the enum member's name 93 elif isinstance(obj, Decimal): 94 return float(obj) 95 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
55def start_file_server(database_path: str, database_callback: callable = None, csv_callback: callable = None, 56 debug: bool = False) -> tuple: 57 """ 58 Starts a multi-purpose HTTP server to manage file interactions for a Zakat application. 59 60 This server facilitates the following functionalities: 61 62 1. GET /{file_uuid}/get: Download the database file specified by `database_path`. 63 2. GET /{file_uuid}/upload: Display an HTML form for uploading files. 64 3. POST /{file_uuid}/upload: Handle file uploads, distinguishing between: 65 - Database File (.db): Replaces the existing database with the uploaded one. 66 - CSV File (.csv): Imports data from the CSV into the existing database. 67 68 Args: 69 database_path (str): The path to the pickle database file. 70 database_callback (callable, optional): A function to call after a successful database upload. 71 It receives the uploaded database path as its argument. 72 csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, 73 the database path, and the debug flag as its arguments. 74 debug (bool, optional): If True, print debugging information. Defaults to False. 75 76 Returns: 77 Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: 78 - file_name (str): The name of the database file. 79 - download_url (str): The URL to download the database file. 80 - upload_url (str): The URL to access the file upload form. 81 - server_thread (threading.Thread): The thread running the server. 82 - shutdown_server (Callable[[], None]): A function to gracefully shut down the server. 83 84 Example: 85 _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") 86 print(f"Download database: {download_url}") 87 print(f"Upload files: {upload_url}") 88 server_thread.start() 89 # ... later ... 90 shutdown_server() 91 """ 92 file_uuid = uuid.uuid4() 93 file_name = os.path.basename(database_path) 94 95 port = find_available_port() 96 download_url = f"http://localhost:{port}/{file_uuid}/get" 97 upload_url = f"http://localhost:{port}/{file_uuid}/upload" 98 99 class Handler(http.server.SimpleHTTPRequestHandler): 100 def do_GET(self): 101 if self.path == f"/{file_uuid}/get": 102 # GET: Serve the existing file 103 try: 104 with open(database_path, "rb") as f: 105 self.send_response(200) 106 self.send_header("Content-type", "application/octet-stream") 107 self.send_header("Content-Disposition", f'attachment; filename="{file_name}"') 108 self.end_headers() 109 self.wfile.write(f.read()) 110 except FileNotFoundError: 111 self.send_error(404, "File not found") 112 elif self.path == f"/{file_uuid}/upload": 113 # GET: Serve the upload form 114 self.send_response(200) 115 self.send_header("Content-type", "text/html") 116 self.end_headers() 117 self.wfile.write(f""" 118 <html lang="en"> 119 <head> 120 <title>Zakat File Server</title> 121 </head> 122 <body> 123 <h1>Zakat File Server</h1> 124 <h3>You can download the <a target="__blank" href="{download_url}">database file</a>...</h3> 125 <h3>Or upload a new file to restore a database or import `CSV` file:</h3> 126 <form action="/{file_uuid}/upload" method="post" enctype="multipart/form-data"> 127 <input type="file" name="file" required><br/> 128 <input type="radio" id="{FileType.Database.value}" name="upload_type" value="{FileType.Database.value}" required> 129 <label for="database">Database File</label><br/> 130 <input type="radio"id="{FileType.CSV.value}" name="upload_type" value="{FileType.CSV.value}"> 131 <label for="csv">CSV File</label><br/> 132 <input type="submit" value="Upload"><br/> 133 </form> 134 </body></html> 135 """.encode()) 136 else: 137 self.send_error(404) 138 139 def do_POST(self): 140 if self.path == f"/{file_uuid}/upload": 141 # POST: Handle request 142 # 1. Get the Form Data 143 form_data = cgi.FieldStorage( 144 fp=self.rfile, 145 headers=self.headers, 146 environ={'REQUEST_METHOD': 'POST'} 147 ) 148 upload_type = form_data.getvalue("upload_type") 149 150 if debug: 151 print('upload_type', upload_type) 152 153 if upload_type not in [FileType.Database.value, FileType.CSV.value]: 154 self.send_error(400, "Invalid upload type") 155 return 156 157 # 2. Extract File Data 158 file_item = form_data['file'] # Assuming 'file' is your file input name 159 160 # 3. Get File Details 161 filename = file_item.filename 162 file_data = file_item.file.read() # Read the file's content 163 164 if debug: 165 print(f'Uploaded filename: {filename}') 166 167 # 4. Define Storage Path for CSV 168 upload_directory = "./uploads" # Create this directory if it doesn't exist 169 os.makedirs(upload_directory, exist_ok=True) 170 file_path = os.path.join(upload_directory, upload_type) 171 172 # 5. Write to Disk 173 with open(file_path, 'wb') as f: 174 f.write(file_data) 175 176 match upload_type: 177 case FileType.Database.value: 178 179 try: 180 # 6. Verify database file 181 # ZakatTracker(db_path=file_path) # FATAL, Circular Imports Error 182 if database_callback is not None: 183 database_callback(file_path) 184 185 # 7. Copy database into the original path 186 shutil.copy2(file_path, database_path) 187 except Exception as e: 188 self.send_error(400, str(e)) 189 return 190 191 case FileType.CSV.value: 192 # 6. Verify CSV file 193 try: 194 # x = ZakatTracker(db_path=database_path) # FATAL, Circular Imports Error 195 # result = x.import_csv(file_path, debug=debug) 196 if csv_callback is not None: 197 result = csv_callback(file_path, database_path, debug) 198 if debug: 199 print(f'CSV imported: {result}') 200 if len(result[2]) != 0: 201 self.send_response(200) 202 self.end_headers() 203 self.wfile.write(json.dumps(result).encode()) 204 return 205 except Exception as e: 206 self.send_error(400, str(e)) 207 return 208 209 self.send_response(200) 210 self.end_headers() 211 self.wfile.write(b"File uploaded successfully.") 212 213 httpd = socketserver.TCPServer(("localhost", port), Handler) 214 server_thread = threading.Thread(target=httpd.serve_forever) 215 216 def shutdown_server(): 217 nonlocal httpd, server_thread 218 httpd.shutdown() 219 httpd.server_close() # Close the socket 220 server_thread.join() # Wait for the thread to finish 221 222 return file_name, download_url, upload_url, server_thread, shutdown_server
Starts a multi-purpose HTTP server to manage file interactions for a Zakat application.
This server facilitates the following functionalities:
- GET /{file_uuid}/get: Download the database file specified by
database_path
. - GET /{file_uuid}/upload: Display an HTML form for uploading files.
- POST /{file_uuid}/upload: Handle file uploads, distinguishing between:
- Database File (.db): Replaces the existing database with the uploaded one.
- CSV File (.csv): Imports data from the CSV into the existing database.
Args: database_path (str): The path to the pickle database file. database_callback (callable, optional): A function to call after a successful database upload. It receives the uploaded database path as its argument. csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, the database path, and the debug flag as its arguments. debug (bool, optional): If True, print debugging information. Defaults to False.
Returns: Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: - file_name (str): The name of the database file. - download_url (str): The URL to download the database file. - upload_url (str): The URL to access the file upload form. - server_thread (threading.Thread): The thread running the server. - shutdown_server (Callable[[], None]): A function to gracefully shut down the server.
Example: _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") print(f"Download database: {download_url}") print(f"Upload files: {upload_url}") server_thread.start() # ... later ... shutdown_server()
32def find_available_port() -> int: 33 """ 34 Finds and returns an available TCP port on the local machine. 35 36 This function utilizes a TCP server socket to bind to port 0, which 37 instructs the operating system to automatically assign an available 38 port. The assigned port is then extracted and returned. 39 40 Returns: 41 int: The available TCP port number. 42 43 Raises: 44 OSError: If an error occurs during the port binding process, such 45 as all ports being in use. 46 47 Example: 48 port = find_available_port() 49 print(f"Available port: {port}") 50 """ 51 with socketserver.TCPServer(("localhost", 0), None) as s: 52 return s.server_address[1]
Finds and returns an available TCP port on the local machine.
This function utilizes a TCP server socket to bind to port 0, which instructs the operating system to automatically assign an available port. The assigned port is then extracted and returned.
Returns: int: The available TCP port number.
Raises: OSError: If an error occurs during the port binding process, such as all ports being in use.
Example: port = find_available_port() print(f"Available port: {port}")