zakat
xxx
_____ _ _ _ _ _
|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _
/ // _| |/ / _
| __| | | | | '_ \| '__/ _` | '__| | | |
/ /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| |
/______,_|_|___,_|__| |_____|_|_.__/|_| __,_|_| __, |
|___/
"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف ... Never Trust, Always Verify ...
This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat.
1""" 2 _____ _ _ _ _ _ 3|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _ 4 / // _` | |/ / _` | __| | | | | '_ \| '__/ _` | '__| | | | 5 / /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| | 6/____\__,_|_|\_\__,_|\__| |_____|_|_.__/|_| \__,_|_| \__, | 7 |___/ 8 9"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف 10... Never Trust, Always Verify ... 11 12This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat. 13""" 14# Importing necessary classes and functions from the main module 15from zakat.zakat_tracker import ( 16 ZakatTracker, 17 test, 18 Action, 19 JSONEncoder, 20 MathOperation, 21) 22 23from zakat.file_server import ( 24 start_file_server, 25 find_available_port, 26 FileType, 27) 28 29# Version information for the module 30__version__ = ZakatTracker.Version() 31__all__ = [ 32 "ZakatTracker", 33 "test", 34 "Action", 35 "JSONEncoder", 36 "MathOperation", 37 "start_file_server", 38 "find_available_port", 39 "FileType", 40]
122class ZakatTracker: 123 """ 124 A class for tracking and calculating Zakat. 125 126 This class provides functionalities for recording transactions, calculating Zakat due, 127 and managing account balances. It also offers features like importing transactions from 128 CSV files, exporting data to JSON format, and saving/loading the tracker state. 129 130 The `ZakatTracker` class is designed to handle both positive and negative transactions, 131 allowing for flexible tracking of financial activities related to Zakat. It also supports 132 the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due 133 based on the current silver price. 134 135 The class uses a camel file as its database to persist the tracker state, 136 ensuring data integrity across sessions. It also provides options for enabling or 137 disabling history tracking, allowing users to choose their preferred level of detail. 138 139 In addition, the `ZakatTracker` class includes various helper methods like 140 `time`, `time_to_datetime`, `lock`, `free`, `recall`, `export_json`, 141 and more. These methods provide additional functionalities and flexibility 142 for interacting with and managing the Zakat tracker. 143 144 Attributes: 145 ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. 146 ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. 147 ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. 148 ZakatTracker.Version (function): The version of the ZakatTracker class. 149 150 Data Structure: 151 The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data. 152 153 _vault (dict): 154 - account (dict): 155 - {account_number} (dict): 156 - balance (int): The current balance of the account. 157 - box (dict): A dictionary storing transaction details. 158 - {timestamp} (dict): 159 - capital (int): The initial amount of the transaction. 160 - count (int): The number of times Zakat has been calculated for this transaction. 161 - last (int): The timestamp of the last Zakat calculation. 162 - rest (int): The remaining amount after Zakat deductions and withdrawal. 163 - total (int): The total Zakat deducted from this transaction. 164 - count (int): The total number of transactions for the account. 165 - log (dict): A dictionary storing transaction logs. 166 - {timestamp} (dict): 167 - value (int): The transaction amount (positive or negative). 168 - desc (str): The description of the transaction. 169 - ref (int): The box reference (positive or None). 170 - file (dict): A dictionary storing file references associated with the transaction. 171 - hide (bool): Indicates whether the account is hidden or not. 172 - zakatable (bool): Indicates whether the account is subject to Zakat. 173 - exchange (dict): 174 - account (dict): 175 - {timestamps} (dict): 176 - rate (float): Exchange rate when compared to local currency. 177 - description (str): The description of the exchange rate. 178 - history (dict): 179 - {timestamp} (list): A list of dictionaries storing the history of actions performed. 180 - {action_dict} (dict): 181 - action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT). 182 - account (str): The account number associated with the action. 183 - ref (int): The reference number of the transaction. 184 - file (int): The reference number of the file (if applicable). 185 - key (str): The key associated with the action (e.g., 'rest', 'total'). 186 - value (int): The value associated with the action. 187 - math (MathOperation): The mathematical operation performed (if applicable). 188 - lock (int or None): The timestamp indicating the current lock status (None if not locked). 189 - report (dict): 190 - {timestamp} (tuple): A tuple storing Zakat report details. 191 192 """ 193 194 @staticmethod 195 def Version() -> str: 196 """ 197 Returns the current version of the software. 198 199 This function returns a string representing the current version of the software, 200 including major, minor, and patch version numbers in the format "X.Y.Z". 201 202 Returns: 203 str: The current version of the software. 204 """ 205 return '0.2.87' 206 207 @staticmethod 208 def ZakatCut(x: float) -> float: 209 """ 210 Calculates the Zakat amount due on an asset. 211 212 This function calculates the zakat amount due on a given asset value over one lunar year. 213 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 214 that exceeds a certain threshold (Nisab). 215 216 Parameters: 217 x: The total value of the asset on which Zakat is to be calculated. 218 219 Returns: 220 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 221 """ 222 return 0.025 * x # Zakat Cut in one Lunar Year 223 224 @staticmethod 225 def TimeCycle(days: int = 355) -> int: 226 """ 227 Calculates the approximate duration of a lunar year in nanoseconds. 228 229 This function calculates the approximate duration of a lunar year based on the given number of days. 230 It converts the given number of days into nanoseconds for use in high-precision timing applications. 231 232 Parameters: 233 days: The number of days in a lunar year. Defaults to 355, 234 which is an approximation of the average length of a lunar year. 235 236 Returns: 237 The approximate duration of a lunar year in nanoseconds. 238 """ 239 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds 240 241 @staticmethod 242 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 243 """ 244 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 245 246 This function calculates the Nisab value, which is the minimum threshold of wealth, 247 that makes an individual liable for paying Zakat. 248 The Nisab value is determined by the equivalent value of a specific amount 249 of gold or silver (currently 595 grams in silver) in the local currency. 250 251 Parameters: 252 - gram_price (float): The price per gram of Nisab. 253 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 254 255 Returns: 256 - float: The total value of Nisab based on the given price per gram. 257 """ 258 return gram_price * gram_quantity 259 260 @staticmethod 261 def ext() -> str: 262 """ 263 Returns the file extension used by the ZakatTracker class. 264 265 Returns: 266 str: The file extension used by the ZakatTracker class, which is 'camel'. 267 """ 268 return 'camel' 269 270 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 271 """ 272 Initialize ZakatTracker with database path and history mode. 273 274 Parameters: 275 db_path (str): The path to the database file. Default is "zakat.camel". 276 history_mode (bool): The mode for tracking history. Default is True. 277 278 Returns: 279 None 280 """ 281 self._base_path = None 282 self._vault_path = None 283 self._vault = None 284 self.reset() 285 self._history(history_mode) 286 self.path(db_path) 287 288 def path(self, path: str = None) -> str: 289 """ 290 Set or get the path to the database file. 291 292 If no path is provided, the current path is returned. 293 If a path is provided, it is set as the new path. 294 The function also creates the necessary directories if the provided path is a file. 295 296 Parameters: 297 path (str): The new path to the database file. If not provided, the current path is returned. 298 299 Returns: 300 str: The current or new path to the database file. 301 """ 302 if path is None: 303 return self._vault_path 304 self._vault_path = Path(path).resolve() 305 base_path = Path(path).resolve() 306 if base_path.is_file() or base_path.suffix: 307 base_path = base_path.parent 308 base_path.mkdir(parents=True, exist_ok=True) 309 self._base_path = base_path 310 return str(self._vault_path) 311 312 def base_path(self, *args) -> str: 313 """ 314 Generate a base path by joining the provided arguments with the existing base path. 315 316 Parameters: 317 *args (str): Variable length argument list of strings to be joined with the base path. 318 319 Returns: 320 str: The generated base path. If no arguments are provided, the existing base path is returned. 321 """ 322 if not args: 323 return str(self._base_path) 324 filtered_args = [] 325 ignored_filename = None 326 for arg in args: 327 if Path(arg).suffix: 328 ignored_filename = arg 329 else: 330 filtered_args.append(arg) 331 base_path = Path(self._base_path) 332 full_path = base_path.joinpath(*filtered_args) 333 full_path.mkdir(parents=True, exist_ok=True) 334 if ignored_filename is not None: 335 return full_path.resolve() / ignored_filename # Join with the ignored filename 336 return str(full_path.resolve()) 337 338 @staticmethod 339 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 340 """ 341 Scales a numerical value by a specified power of 10, returning an integer. 342 343 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 344 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 345 346 Parameters: 347 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 348 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 349 by a factor of 100 (e.g., converts 1.23 to 123). 350 351 Returns: 352 The scaled value, rounded to the nearest integer. 353 354 Raises: 355 TypeError: If the input `x` is not a valid numeric type. 356 357 Examples: 358 >>> ZakatTracker.scale(3.14159) 359 314 360 >>> ZakatTracker.scale(1234, decimal_places=3) 361 1234000 362 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 363 50 364 """ 365 if not isinstance(x, (float, int, Decimal)): 366 raise TypeError("Input 'x' must be a float, int, or Decimal.") 367 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places)) 368 369 @staticmethod 370 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 371 """ 372 Unscales an integer by a power of 10. 373 374 Parameters: 375 x: The integer to unscale. 376 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 377 decimal_places: The power of 10 to use. Defaults to 2. 378 379 Returns: 380 The unscaled number, converted to the specified return_type. 381 382 Raises: 383 TypeError: If the return_type is not float or Decimal. 384 """ 385 if return_type not in (float, Decimal): 386 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 387 return round(return_type(x / (10 ** decimal_places)), decimal_places) 388 389 def _history(self, status: bool = None) -> bool: 390 """ 391 Enable or disable history tracking. 392 393 Parameters: 394 status (bool): The status of history tracking. Default is True. 395 396 Returns: 397 None 398 """ 399 if status is not None: 400 self._history_mode = status 401 return self._history_mode 402 403 def reset(self) -> None: 404 """ 405 Reset the internal data structure to its initial state. 406 407 Parameters: 408 None 409 410 Returns: 411 None 412 """ 413 self._vault = { 414 'account': {}, 415 'exchange': {}, 416 'history': {}, 417 'lock': None, 418 'report': {}, 419 } 420 421 @staticmethod 422 def time(now: datetime = None) -> int: 423 """ 424 Generates a timestamp based on the provided datetime object or the current datetime. 425 426 Parameters: 427 now (datetime, optional): The datetime object to generate the timestamp from. 428 If not provided, the current datetime is used. 429 430 Returns: 431 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 432 before 1970 will return in negative until 1000AD. 433 """ 434 if now is None: 435 now = datetime.datetime.now() 436 ordinal_day = now.toordinal() 437 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 438 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day) 439 440 @staticmethod 441 def time_to_datetime(ordinal_ns: int) -> datetime: 442 """ 443 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 444 445 Parameters: 446 ordinal_ns (int): The ordinal number of days since 1000-01-01. 447 448 Returns: 449 datetime: The corresponding datetime object. 450 """ 451 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 452 ns_in_day = ordinal_ns % 86_400_000_000_000 453 d = datetime.datetime.fromordinal(ordinal_day) 454 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 455 return datetime.datetime.combine(d, datetime.time()) + t 456 457 def clean_history(self, lock: int | None = None) -> int: 458 """ 459 Cleans up the history of actions performed on the ZakatTracker instance. 460 461 Parameters: 462 lock (int, optional): The lock ID is used to clean up the empty history. 463 If not provided, it cleans up the empty history records for all locks. 464 465 Returns: 466 int: The number of locks cleaned up. 467 """ 468 count = 0 469 if lock in self._vault['history']: 470 if len(self._vault['history'][lock]) <= 0: 471 count += 1 472 del self._vault['history'][lock] 473 return count 474 self.free(self.lock()) 475 for lock in self._vault['history']: 476 if len(self._vault['history'][lock]) <= 0: 477 count += 1 478 del self._vault['history'][lock] 479 return count 480 481 def _step(self, action: Action = None, account=None, ref: int = None, file: int = None, value: float = None, 482 key: str = None, math_operation: MathOperation = None) -> int: 483 """ 484 This method is responsible for recording the actions performed on the ZakatTracker. 485 486 Parameters: 487 - action (Action): The type of action performed. 488 - account (str): The account number on which the action was performed. 489 - ref (int): The reference number of the action. 490 - file (int): The file reference number of the action. 491 - value (int): The value associated with the action. 492 - key (str): The key associated with the action. 493 - math_operation (MathOperation): The mathematical operation performed during the action. 494 495 Returns: 496 - int: The lock time of the recorded action. If no lock was performed, it returns 0. 497 """ 498 if not self._history(): 499 return 0 500 lock = self._vault['lock'] 501 if self.nolock(): 502 lock = self._vault['lock'] = self.time() 503 self._vault['history'][lock] = [] 504 if action is None: 505 return lock 506 self._vault['history'][lock].append({ 507 'action': action, 508 'account': account, 509 'ref': ref, 510 'file': file, 511 'key': key, 512 'value': value, 513 'math': math_operation, 514 }) 515 return lock 516 517 def nolock(self) -> bool: 518 """ 519 Check if the vault lock is currently not set. 520 521 Returns: 522 bool: True if the vault lock is not set, False otherwise. 523 """ 524 return self._vault['lock'] is None 525 526 def lock(self) -> int: 527 """ 528 Acquires a lock on the ZakatTracker instance. 529 530 Returns: 531 int: The lock ID. This ID can be used to release the lock later. 532 """ 533 return self._step() 534 535 def vault(self) -> dict: 536 """ 537 Returns a copy of the internal vault dictionary. 538 539 This method is used to retrieve the current state of the ZakatTracker object. 540 It provides a snapshot of the internal data structure, allowing for further 541 processing or analysis. 542 543 Returns: 544 dict: A copy of the internal vault dictionary. 545 """ 546 return self._vault.copy() 547 548 def stats(self) -> dict[str, tuple]: 549 """ 550 Calculates and returns statistics about the object's data storage. 551 552 This method determines the size of the database file on disk and the 553 size of the data currently held in RAM (likely within a dictionary). 554 Both sizes are reported in bytes and in a human-readable format 555 (e.g., KB, MB). 556 557 Returns: 558 dict[str, tuple]: A dictionary containing the following statistics: 559 560 * 'database': A tuple with two elements: 561 - The database file size in bytes (int). 562 - The database file size in human-readable format (str). 563 * 'ram': A tuple with two elements: 564 - The RAM usage (dictionary size) in bytes (int). 565 - The RAM usage in human-readable format (str). 566 567 Example: 568 >>> stats = my_object.stats() 569 >>> print(stats['database']) 570 (256000, '250.0 KB') 571 >>> print(stats['ram']) 572 (12345, '12.1 KB') 573 """ 574 ram_size = self.get_dict_size(self.vault()) 575 file_size = os.path.getsize(self.path()) 576 return { 577 'database': (file_size, self.human_readable_size(file_size)), 578 'ram': (ram_size, self.human_readable_size(ram_size)), 579 } 580 581 def files(self) -> list[dict[str, str | int]]: 582 """ 583 Retrieves information about files associated with this class. 584 585 This class method provides a standardized way to gather details about 586 files used by the class for storage, snapshots, and CSV imports. 587 588 Returns: 589 list[dict[str, str | int]]: A list of dictionaries, each containing information 590 about a specific file: 591 592 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 593 * path (str): The full file path. 594 * exists (bool): Whether the file exists on the filesystem. 595 * size (int): The file size in bytes (0 if the file doesn't exist). 596 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 597 598 Example: 599 ``` 600 file_info = MyClass.files() 601 for info in file_info: 602 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 603 ``` 604 """ 605 result = [] 606 for file_type, path in { 607 'database': self.path(), 608 'snapshot': self.snapshot_cache_path(), 609 'import_csv': self.import_csv_cache_path(), 610 }.items(): 611 exists = os.path.exists(path) 612 size = os.path.getsize(path) if exists else 0 613 human_readable_size = self.human_readable_size(size) if exists else 0 614 result.append({ 615 'type': file_type, 616 'path': path, 617 'exists': exists, 618 'size': size, 619 'human_readable_size': human_readable_size, 620 }) 621 return result 622 623 def steps(self) -> dict: 624 """ 625 Returns a copy of the history of steps taken in the ZakatTracker. 626 627 The history is a dictionary where each key is a unique identifier for a step, 628 and the corresponding value is a dictionary containing information about the step. 629 630 Returns: 631 dict: A copy of the history of steps taken in the ZakatTracker. 632 """ 633 return self._vault['history'].copy() 634 635 def free(self, lock: int, auto_save: bool = True) -> bool: 636 """ 637 Releases the lock on the database. 638 639 Parameters: 640 lock (int): The lock ID to be released. 641 auto_save (bool): Whether to automatically save the database after releasing the lock. 642 643 Returns: 644 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 645 """ 646 if lock == self._vault['lock']: 647 self._vault['lock'] = None 648 self.clean_history(lock) 649 if auto_save: 650 return self.save(self.path()) 651 return True 652 return False 653 654 def account_exists(self, account) -> bool: 655 """ 656 Check if the given account exists in the vault. 657 658 Parameters: 659 account (str): The account number to check. 660 661 Returns: 662 bool: True if the account exists, False otherwise. 663 """ 664 return account in self._vault['account'] 665 666 def box_size(self, account) -> int: 667 """ 668 Calculate the size of the box for a specific account. 669 670 Parameters: 671 account (str): The account number for which the box size needs to be calculated. 672 673 Returns: 674 int: The size of the box for the given account. If the account does not exist, -1 is returned. 675 """ 676 if self.account_exists(account): 677 return len(self._vault['account'][account]['box']) 678 return -1 679 680 def log_size(self, account) -> int: 681 """ 682 Get the size of the log for a specific account. 683 684 Parameters: 685 account (str): The account number for which the log size needs to be calculated. 686 687 Returns: 688 int: The size of the log for the given account. If the account does not exist, -1 is returned. 689 """ 690 if self.account_exists(account): 691 return len(self._vault['account'][account]['log']) 692 return -1 693 694 @staticmethod 695 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 696 """ 697 Calculates the hash of a file using the specified algorithm. 698 699 Parameters: 700 file_path (str): The path to the file. 701 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 702 703 Returns: 704 str: The hexadecimal representation of the file's hash. 705 """ 706 hash_obj = hashlib.new(algorithm) # Create the hash object 707 with open(file_path, "rb") as f: # Open file in binary mode for reading 708 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 709 hash_obj.update(chunk) 710 return hash_obj.hexdigest() # Return the hash as a hexadecimal string 711 712 def snapshot_cache_path(self): 713 """ 714 Generate the path for the cache file used to store snapshots. 715 716 The cache file is a camel file that stores the timestamps of the snapshots. 717 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 718 719 Returns: 720 str: The path to the cache file. 721 """ 722 path = str(self.path()) 723 ext = self.ext() 724 ext_len = len(ext) 725 if path.endswith(f'.{ext}'): 726 path = path[:-ext_len-1] 727 _, filename = os.path.split(path + f'.snapshots.{ext}') 728 return self.base_path(filename) 729 730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True 761 762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result 793 794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True 941 942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False 957 958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref) 970 971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created 1036 1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref) 1049 1050 def _log(self, value: float, desc: str = '', account: str = 1, created: int = None, ref: int = None, 1051 debug: bool = False) -> int: 1052 """ 1053 Log a transaction into the account's log. 1054 1055 Parameters: 1056 value (float): The value of the transaction. 1057 desc (str): The description of the transaction. 1058 account (str): The account to log the transaction into. Default is '1'. 1059 created (int): The timestamp of the transaction. If not provided, it will be generated. 1060 1061 Returns: 1062 int: The timestamp of the logged transaction. 1063 1064 This method updates the account's balance, count, and log with the transaction details. 1065 It also creates a step in the history of the transaction. 1066 1067 Raises: 1068 ValueError: The log transaction happened again in the same nanosecond time. 1069 """ 1070 if debug: 1071 print('_log', f'debug={debug}') 1072 if created is None: 1073 created = self.time() 1074 try: 1075 self._vault['account'][account]['balance'] += value 1076 except TypeError: 1077 self._vault['account'][account]['balance'] += Decimal(value) 1078 self._vault['account'][account]['count'] += 1 1079 if debug: 1080 print('create-log', created) 1081 if self.log_exists(account, created): 1082 raise ValueError(f"The log transaction happened again in the same nanosecond time({created}).") 1083 if debug: 1084 print('created-log', created) 1085 self._vault['account'][account]['log'][created] = { 1086 'value': value, 1087 'desc': desc, 1088 'ref': ref, 1089 'file': {}, 1090 } 1091 self._step(Action.LOG, account, ref=created, value=value) 1092 return created 1093 1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ 1144 1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate 1159 1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy() 1173 1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result 1188 1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {} 1203 1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {} 1218 1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0 1242 1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False 1267 1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1] 1287 1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False 1320 1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False 1353 1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages 1433 1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times 1508 1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan 1621 1622 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 demand (float): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': float, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': demand, 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts 1656 1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0 1707 1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1779 if no_lock: 1780 self.free(self.lock()) 1781 return True 1782 1783 def export_json(self, path: str = "data.json") -> bool: 1784 """ 1785 Exports the current state of the ZakatTracker object to a JSON file. 1786 1787 Parameters: 1788 path (str): The path where the JSON file will be saved. Default is "data.json". 1789 1790 Returns: 1791 bool: True if the export is successful, False otherwise. 1792 1793 Raises: 1794 No specific exceptions are raised by this method. 1795 """ 1796 with open(path, "w") as file: 1797 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1798 return True 1799 1800 def save(self, path: str = None) -> bool: 1801 """ 1802 Saves the ZakatTracker's current state to a camel file. 1803 1804 This method serializes the internal data (`_vault`). 1805 1806 Parameters: 1807 path (str, optional): File path for saving. Defaults to a predefined location. 1808 1809 Returns: 1810 bool: True if the save operation is successful, False otherwise. 1811 """ 1812 if path is None: 1813 path = self.path() 1814 with open(f'{path}.tmp', 'w') as stream: 1815 # first save in tmp file 1816 stream.write(camel.dump(self._vault)) 1817 # then move tmp file to original location 1818 shutil.move(f'{path}.tmp', path) 1819 return True 1820 1821 def load(self, path: str = None) -> bool: 1822 """ 1823 Load the current state of the ZakatTracker object from a camel file. 1824 1825 Parameters: 1826 path (str): The path where the camel file is located. If not provided, it will use the default path. 1827 1828 Returns: 1829 bool: True if the load operation is successful, False otherwise. 1830 """ 1831 if path is None: 1832 path = self.path() 1833 if os.path.exists(path): 1834 with open(path, 'r') as stream: 1835 self._vault = camel.load(stream.read()) 1836 return True 1837 return False 1838 1839 def import_csv_cache_path(self): 1840 """ 1841 Generates the cache file path for imported CSV data. 1842 1843 This function constructs the file path where cached data from CSV imports 1844 will be stored. The cache file is a camel file (.camel extension) appended 1845 to the base path of the object. 1846 1847 Returns: 1848 str: The full path to the import CSV cache file. 1849 1850 Example: 1851 >>> obj = ZakatTracker('/data/reports') 1852 >>> obj.import_csv_cache_path() 1853 '/data/reports.import_csv.camel' 1854 """ 1855 path = str(self.path()) 1856 ext = self.ext() 1857 ext_len = len(ext) 1858 if path.endswith(f'.{ext}'): 1859 path = path[:-ext_len-1] 1860 _, filename = os.path.split(path + f'.import_csv.{ext}') 1861 return self.base_path(filename) 1862 1863 def import_csv(self, path: str = 'file.csv', scale_decimal_places: int = 0, debug: bool = False) -> tuple: 1864 """ 1865 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1866 1867 Parameters: 1868 path (str): The path to the CSV file. Default is 'file.csv'. 1869 scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. 1870 debug (bool): A flag indicating whether to print debug information. 1871 1872 Returns: 1873 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1874 and a dictionary of bad transactions. 1875 1876 Notes: 1877 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1878 are appropriate for the currency pairs involved in the conversions. 1879 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1880 to 1.0 or the previous rate for that account. 1881 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1882 transactions of the same account within the whole imported and existing dataset when doing `check` and 1883 `zakat` operations. 1884 1885 Example Usage: 1886 The CSV file should have the following format, rate is optional per transaction: 1887 account, desc, value, date, rate 1888 For example: 1889 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1890 """ 1891 if debug: 1892 print('import_csv', f'debug={debug}') 1893 cache: list[int] = [] 1894 try: 1895 with open(self.import_csv_cache_path(), 'r') as stream: 1896 cache = camel.load(stream.read()) 1897 except: 1898 pass 1899 date_formats = [ 1900 "%Y-%m-%d %H:%M:%S", 1901 "%Y-%m-%dT%H:%M:%S", 1902 "%Y-%m-%dT%H%M%S", 1903 "%Y-%m-%d", 1904 ] 1905 created, found, bad = 0, 0, {} 1906 data: dict[int, list] = {} 1907 with open(path, newline='', encoding="utf-8") as f: 1908 i = 0 1909 for row in csv.reader(f, delimiter=','): 1910 i += 1 1911 hashed = hash(tuple(row)) 1912 if hashed in cache: 1913 found += 1 1914 continue 1915 account = row[0] 1916 desc = row[1] 1917 value = float(row[2]) 1918 rate = 1.0 1919 if row[4:5]: # Empty list if index is out of range 1920 rate = float(row[4]) 1921 date: int = 0 1922 for time_format in date_formats: 1923 try: 1924 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1925 break 1926 except: 1927 pass 1928 # TODO: not allowed for negative dates in the future after enhance time functions 1929 if date == 0: 1930 bad[i] = row + ['invalid date'] 1931 if value == 0: 1932 bad[i] = row + ['invalid value'] 1933 continue 1934 if date not in data: 1935 data[date] = [] 1936 data[date].append((i, account, desc, value, date, rate, hashed)) 1937 1938 if debug: 1939 print('import_csv', len(data)) 1940 1941 if bad: 1942 return created, found, bad 1943 1944 for date, rows in sorted(data.items()): 1945 try: 1946 len_rows = len(rows) 1947 if len_rows == 1: 1948 (_, account, desc, unscaled_value, date, rate, hashed) = rows[0] 1949 value = self.unscale(unscaled_value, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value 1950 if rate > 0: 1951 self.exchange(account=account, created=date, rate=rate) 1952 if value > 0: 1953 self.track(unscaled_value=value, desc=desc, account=account, logging=True, created=date) 1954 elif value < 0: 1955 self.sub(unscaled_value=-value, desc=desc, account=account, created=date) 1956 created += 1 1957 cache.append(hashed) 1958 continue 1959 if debug: 1960 print('-- Duplicated time detected', date, 'len', len_rows) 1961 print(rows) 1962 print('---------------------------------') 1963 # If records are found at the same time with different accounts in the same amount 1964 # (one positive and the other negative), this indicates it is a transfer. 1965 if len_rows != 2: 1966 raise Exception(f'more than two transactions({len_rows}) at the same time') 1967 (i, account1, desc1, unscaled_value1, date1, rate1, _) = rows[0] 1968 (j, account2, desc2, unscaled_value2, date2, rate2, _) = rows[1] 1969 if account1 == account2 or desc1 != desc2 or abs(unscaled_value1) != abs(unscaled_value2) or date1 != date2: 1970 raise Exception('invalid transfer') 1971 if rate1 > 0: 1972 self.exchange(account1, created=date1, rate=rate1) 1973 if rate2 > 0: 1974 self.exchange(account2, created=date2, rate=rate2) 1975 value1 = self.unscale(unscaled_value1, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value1 1976 value2 = self.unscale(unscaled_value2, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value2 1977 values = { 1978 value1: account1, 1979 value2: account2, 1980 } 1981 self.transfer( 1982 unscaled_amount=abs(value1), 1983 from_account=values[min(values.keys())], 1984 to_account=values[max(values.keys())], 1985 desc=desc1, 1986 created=date1, 1987 ) 1988 except Exception as e: 1989 for (i, account, desc, value, date, rate, _) in rows: 1990 bad[i] = (account, desc, value, date, rate, e) 1991 break 1992 with open(self.import_csv_cache_path(), 'w') as stream: 1993 stream.write(camel.dump(cache)) 1994 return created, found, bad 1995 1996 ######## 1997 # TESTS # 1998 ####### 1999 2000 @staticmethod 2001 def human_readable_size(size: float, decimal_places: int = 2) -> str: 2002 """ 2003 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 2004 2005 This function iterates through progressively larger units of information 2006 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2007 range that can be expressed with a reasonable number before the unit. 2008 2009 Parameters: 2010 size (float): The size in bytes to convert. 2011 decimal_places (int, optional): The number of decimal places to display 2012 in the result. Defaults to 2. 2013 2014 Returns: 2015 str: A string representation of the size in a human-readable format, 2016 rounded to the specified number of decimal places. For example: 2017 - "1.50 KB" (1536 bytes) 2018 - "23.00 MB" (24117248 bytes) 2019 - "1.23 GB" (1325899906 bytes) 2020 """ 2021 if type(size) not in (float, int): 2022 raise TypeError("size must be a float or integer") 2023 if type(decimal_places) != int: 2024 raise TypeError("decimal_places must be an integer") 2025 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2026 if size < 1024.0: 2027 break 2028 size /= 1024.0 2029 return f"{size:.{decimal_places}f} {unit}" 2030 2031 @staticmethod 2032 def get_dict_size(obj: dict, seen: set = None) -> float: 2033 """ 2034 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2035 2036 This function traverses the dictionary structure, accounting for the size of keys, values, 2037 and any nested objects. It handles various data types commonly found in dictionaries 2038 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2039 of circular references. 2040 2041 Parameters: 2042 obj (dict): The dictionary whose size is to be calculated. 2043 seen (set, optional): A set used internally to track visited objects 2044 and avoid circular references. Defaults to None. 2045 2046 Returns: 2047 float: An approximate size of the dictionary and its contents in bytes. 2048 2049 Note: 2050 - This function is a method of the `ZakatTracker` class and is likely used to 2051 estimate the memory footprint of data structures relevant to Zakat calculations. 2052 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2053 not account for all memory overhead depending on the Python implementation. 2054 - Circular references are handled to prevent infinite recursion. 2055 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2056 - String sizes are estimated based on character length and encoding. 2057 """ 2058 size = 0 2059 if seen is None: 2060 seen = set() 2061 2062 obj_id = id(obj) 2063 if obj_id in seen: 2064 return 0 2065 2066 seen.add(obj_id) 2067 size += sys.getsizeof(obj) 2068 2069 if isinstance(obj, dict): 2070 for k, v in obj.items(): 2071 size += ZakatTracker.get_dict_size(k, seen) 2072 size += ZakatTracker.get_dict_size(v, seen) 2073 elif isinstance(obj, (list, tuple, set, frozenset)): 2074 for item in obj: 2075 size += ZakatTracker.get_dict_size(item, seen) 2076 elif isinstance(obj, (int, float, complex)): # Handle numbers 2077 pass # Basic numbers have a fixed size, so nothing to add here 2078 elif isinstance(obj, str): # Handle strings 2079 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2080 return size 2081 2082 @staticmethod 2083 def duration_from_nanoseconds(ns: int, 2084 show_zeros_in_spoken_time: bool = False, 2085 spoken_time_separator=',', 2086 millennia: str = 'Millennia', 2087 century: str = 'Century', 2088 years: str = 'Years', 2089 days: str = 'Days', 2090 hours: str = 'Hours', 2091 minutes: str = 'Minutes', 2092 seconds: str = 'Seconds', 2093 milli_seconds: str = 'MilliSeconds', 2094 micro_seconds: str = 'MicroSeconds', 2095 nano_seconds: str = 'NanoSeconds', 2096 ) -> tuple: 2097 """ 2098 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2099 Convert NanoSeconds to Human Readable Time Format. 2100 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2101 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2102 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2103 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2104 2105 INPUT : ms (AKA: MilliSeconds) 2106 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2107 OUTPUT Variables: time_lapsed, spoken_time 2108 2109 Example Input: duration_from_nanoseconds(ns) 2110 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2111 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2112 duration_from_nanoseconds(1234567890123456789012) 2113 """ 2114 us, ns = divmod(ns, 1000) 2115 ms, us = divmod(us, 1000) 2116 s, ms = divmod(ms, 1000) 2117 m, s = divmod(s, 60) 2118 h, m = divmod(m, 60) 2119 d, h = divmod(h, 24) 2120 y, d = divmod(d, 365) 2121 c, y = divmod(y, 100) 2122 n, c = divmod(c, 10) 2123 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2124 spoken_time_part = [] 2125 if n > 0 or show_zeros_in_spoken_time: 2126 spoken_time_part.append(f"{n: 3d} {millennia}") 2127 if c > 0 or show_zeros_in_spoken_time: 2128 spoken_time_part.append(f"{c: 4d} {century}") 2129 if y > 0 or show_zeros_in_spoken_time: 2130 spoken_time_part.append(f"{y: 3d} {years}") 2131 if d > 0 or show_zeros_in_spoken_time: 2132 spoken_time_part.append(f"{d: 4d} {days}") 2133 if h > 0 or show_zeros_in_spoken_time: 2134 spoken_time_part.append(f"{h: 2d} {hours}") 2135 if m > 0 or show_zeros_in_spoken_time: 2136 spoken_time_part.append(f"{m: 2d} {minutes}") 2137 if s > 0 or show_zeros_in_spoken_time: 2138 spoken_time_part.append(f"{s: 2d} {seconds}") 2139 if ms > 0 or show_zeros_in_spoken_time: 2140 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2141 if us > 0 or show_zeros_in_spoken_time: 2142 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2143 if ns > 0 or show_zeros_in_spoken_time: 2144 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2145 return time_lapsed, spoken_time_separator.join(spoken_time_part) 2146 2147 @staticmethod 2148 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2149 """ 2150 Convert a specific day, month, and year into a timestamp. 2151 2152 Parameters: 2153 day (int): The day of the month. 2154 month (int): The month of the year. Default is 6 (June). 2155 year (int): The year. Default is 2024. 2156 2157 Returns: 2158 int: The timestamp representing the given day, month, and year. 2159 2160 Note: 2161 This method assumes the default month and year if not provided. 2162 """ 2163 return ZakatTracker.time(datetime.datetime(year, month, day)) 2164 2165 @staticmethod 2166 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2167 """ 2168 Generate a random date between two given dates. 2169 2170 Parameters: 2171 start_date (datetime.datetime): The start date from which to generate a random date. 2172 end_date (datetime.datetime): The end date until which to generate a random date. 2173 2174 Returns: 2175 datetime.datetime: A random date between the start_date and end_date. 2176 """ 2177 time_between_dates = end_date - start_date 2178 days_between_dates = time_between_dates.days 2179 random_number_of_days = random.randrange(days_between_dates) 2180 return start_date + datetime.timedelta(days=random_number_of_days) 2181 2182 @staticmethod 2183 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2184 debug: bool = False) -> int: 2185 """ 2186 Generate a random CSV file with specified parameters. 2187 2188 Parameters: 2189 path (str): The path where the CSV file will be saved. Default is "data.csv". 2190 count (int): The number of rows to generate in the CSV file. Default is 1000. 2191 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2192 debug (bool): A flag indicating whether to print debug information. 2193 2194 Returns: 2195 None. The function generates a CSV file at the specified path with the given count of rows. 2196 Each row contains a randomly generated account, description, value, and date. 2197 The value is randomly generated between 1000 and 100000, 2198 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2199 If the row number is not divisible by 13, the value is multiplied by -1. 2200 """ 2201 if debug: 2202 print('generate_random_csv_file', f'debug={debug}') 2203 i = 0 2204 with open(path, "w", newline="") as csvfile: 2205 writer = csv.writer(csvfile) 2206 for i in range(count): 2207 account = f"acc-{random.randint(1, 1000)}" 2208 desc = f"Some text {random.randint(1, 1000)}" 2209 value = random.randint(1000, 100000) 2210 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2211 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2212 if not i % 13 == 0: 2213 value *= -1 2214 row = [account, desc, value, date] 2215 if with_rate: 2216 rate = random.randint(1, 100) * 0.12 2217 if debug: 2218 print('before-append', row) 2219 row.append(rate) 2220 if debug: 2221 print('after-append', row) 2222 writer.writerow(row) 2223 i = i + 1 2224 return i 2225 2226 @staticmethod 2227 def create_random_list(max_sum, min_value=0, max_value=10): 2228 """ 2229 Creates a list of random integers whose sum does not exceed the specified maximum. 2230 2231 Args: 2232 max_sum: The maximum allowed sum of the list elements. 2233 min_value: The minimum possible value for an element (inclusive). 2234 max_value: The maximum possible value for an element (inclusive). 2235 2236 Returns: 2237 A list of random integers. 2238 """ 2239 result = [] 2240 current_sum = 0 2241 2242 while current_sum < max_sum: 2243 # Calculate the remaining space for the next element 2244 remaining_sum = max_sum - current_sum 2245 # Determine the maximum possible value for the next element 2246 next_max_value = min(remaining_sum, max_value) 2247 # Generate a random element within the allowed range 2248 next_element = random.randint(min_value, next_max_value) 2249 result.append(next_element) 2250 current_sum += next_element 2251 2252 return result 2253 2254 def _test_core(self, restore=False, debug=False): 2255 2256 if debug: 2257 random.seed(1234567890) 2258 2259 # sanity check - random forward time 2260 2261 xlist = [] 2262 limit = 1000 2263 for _ in range(limit): 2264 y = ZakatTracker.time() 2265 z = '-' 2266 if y not in xlist: 2267 xlist.append(y) 2268 else: 2269 z = 'x' 2270 if debug: 2271 print(z, y) 2272 xx = len(xlist) 2273 if debug: 2274 print('count', xx, ' - unique: ', (xx / limit) * 100, '%') 2275 assert limit == xx 2276 2277 # sanity check - convert date since 1000AD 2278 2279 for year in range(1000, 9000): 2280 ns = ZakatTracker.time(datetime.datetime.strptime(f"{year}-12-30 18:30:45", "%Y-%m-%d %H:%M:%S")) 2281 date = ZakatTracker.time_to_datetime(ns) 2282 if debug: 2283 print(date) 2284 assert date.year == year 2285 assert date.month == 12 2286 assert date.day == 30 2287 assert date.hour == 18 2288 assert date.minute == 30 2289 assert date.second in [44, 45] 2290 2291 # human_readable_size 2292 2293 assert ZakatTracker.human_readable_size(0) == "0.00 B" 2294 assert ZakatTracker.human_readable_size(512) == "512.00 B" 2295 assert ZakatTracker.human_readable_size(1023) == "1023.00 B" 2296 2297 assert ZakatTracker.human_readable_size(1024) == "1.00 KB" 2298 assert ZakatTracker.human_readable_size(2048) == "2.00 KB" 2299 assert ZakatTracker.human_readable_size(5120) == "5.00 KB" 2300 2301 assert ZakatTracker.human_readable_size(1024 ** 2) == "1.00 MB" 2302 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2) == "2.50 MB" 2303 2304 assert ZakatTracker.human_readable_size(1024 ** 3) == "1.00 GB" 2305 assert ZakatTracker.human_readable_size(1024 ** 4) == "1.00 TB" 2306 assert ZakatTracker.human_readable_size(1024 ** 5) == "1.00 PB" 2307 2308 assert ZakatTracker.human_readable_size(1536, decimal_places=0) == "2 KB" 2309 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2, decimal_places=1) == "2.5 MB" 2310 assert ZakatTracker.human_readable_size(1234567890, decimal_places=3) == "1.150 GB" 2311 2312 try: 2313 # noinspection PyTypeChecker 2314 ZakatTracker.human_readable_size("not a number") 2315 assert False, "Expected TypeError for invalid input" 2316 except TypeError: 2317 pass 2318 2319 try: 2320 # noinspection PyTypeChecker 2321 ZakatTracker.human_readable_size(1024, decimal_places="not an int") 2322 assert False, "Expected TypeError for invalid decimal_places" 2323 except TypeError: 2324 pass 2325 2326 # get_dict_size 2327 assert ZakatTracker.get_dict_size({}) == sys.getsizeof({}), "Empty dictionary size mismatch" 2328 assert ZakatTracker.get_dict_size({"a": 1, "b": 2.5, "c": True}) != sys.getsizeof({}), "Not Empty dictionary" 2329 2330 # number scale 2331 error = 0 2332 total = 0 2333 for sign in ['', '-']: 2334 for max_i, max_j, decimal_places in [ 2335 (101, 101, 2), # fiat currency minimum unit took 2 decimal places 2336 (1, 1_000, 8), # cryptocurrency like Satoshi in Bitcoin took 8 decimal places 2337 (1, 1_000, 18) # cryptocurrency like Wei in Ethereum took 18 decimal places 2338 ]: 2339 for return_type in ( 2340 float, 2341 Decimal, 2342 ): 2343 for i in range(max_i): 2344 for j in range(max_j): 2345 total += 1 2346 num_str = f'{sign}{i}.{j:0{decimal_places}d}' 2347 num = return_type(num_str) 2348 scaled = self.scale(num, decimal_places=decimal_places) 2349 unscaled = self.unscale(scaled, return_type=return_type, decimal_places=decimal_places) 2350 if debug: 2351 print( 2352 f'return_type: {return_type}, num_str: {num_str} - num: {num} - scaled: {scaled} - unscaled: {unscaled}') 2353 if unscaled != num: 2354 if debug: 2355 print('***** SCALE ERROR *****') 2356 error += 1 2357 if debug: 2358 print(f'total: {total}, error({error}): {100 * error / total}%') 2359 assert error == 0 2360 2361 assert self.nolock() 2362 assert self._history() is True 2363 2364 table = { 2365 1: [ 2366 (0, 10, 1000, 1000, 1000, 1, 1), 2367 (0, 20, 3000, 3000, 3000, 2, 2), 2368 (0, 30, 6000, 6000, 6000, 3, 3), 2369 (1, 15, 4500, 4500, 4500, 3, 4), 2370 (1, 50, -500, -500, -500, 4, 5), 2371 (1, 100, -10500, -10500, -10500, 5, 6), 2372 ], 2373 'wallet': [ 2374 (1, 90, -9000, -9000, -9000, 1, 1), 2375 (0, 100, 1000, 1000, 1000, 2, 2), 2376 (1, 190, -18000, -18000, -18000, 3, 3), 2377 (0, 1000, 82000, 82000, 82000, 4, 4), 2378 ], 2379 } 2380 for x in table: 2381 for y in table[x]: 2382 self.lock() 2383 if y[0] == 0: 2384 ref = self.track( 2385 unscaled_value=y[1], 2386 desc='test-add', 2387 account=x, 2388 logging=True, 2389 created=ZakatTracker.time(), 2390 debug=debug, 2391 ) 2392 else: 2393 (ref, z) = self.sub( 2394 unscaled_value=y[1], 2395 desc='test-sub', 2396 account=x, 2397 created=ZakatTracker.time(), 2398 ) 2399 if debug: 2400 print('_sub', z, ZakatTracker.time()) 2401 assert ref != 0 2402 assert len(self._vault['account'][x]['log'][ref]['file']) == 0 2403 for i in range(3): 2404 file_ref = self.add_file(x, ref, 'file_' + str(i)) 2405 sleep(0.0000001) 2406 assert file_ref != 0 2407 if debug: 2408 print('ref', ref, 'file', file_ref) 2409 assert len(self._vault['account'][x]['log'][ref]['file']) == i + 1 2410 file_ref = self.add_file(x, ref, 'file_' + str(3)) 2411 assert self.remove_file(x, ref, file_ref) 2412 z = self.balance(x) 2413 if debug: 2414 print("debug-0", z, y) 2415 assert z == y[2] 2416 z = self.balance(x, False) 2417 if debug: 2418 print("debug-1", z, y[3]) 2419 assert z == y[3] 2420 o = self._vault['account'][x]['log'] 2421 z = 0 2422 for i in o: 2423 z += o[i]['value'] 2424 if debug: 2425 print("debug-2", z, type(z)) 2426 print("debug-2", y[4], type(y[4])) 2427 assert z == y[4] 2428 if debug: 2429 print('debug-2 - PASSED') 2430 assert self.box_size(x) == y[5] 2431 assert self.log_size(x) == y[6] 2432 assert not self.nolock() 2433 self.free(self.lock()) 2434 assert self.nolock() 2435 assert self.boxes(x) != {} 2436 assert self.logs(x) != {} 2437 2438 assert not self.hide(x) 2439 assert self.hide(x, False) is False 2440 assert self.hide(x) is False 2441 assert self.hide(x, True) 2442 assert self.hide(x) 2443 2444 assert self.zakatable(x) 2445 assert self.zakatable(x, False) is False 2446 assert self.zakatable(x) is False 2447 assert self.zakatable(x, True) 2448 assert self.zakatable(x) 2449 2450 if restore is True: 2451 count = len(self._vault['history']) 2452 if debug: 2453 print('history-count', count) 2454 assert count == 10 2455 # try mode 2456 for _ in range(count): 2457 assert self.recall(True, debug) 2458 count = len(self._vault['history']) 2459 if debug: 2460 print('history-count', count) 2461 assert count == 10 2462 _accounts = list(table.keys()) 2463 accounts_limit = len(_accounts) + 1 2464 for i in range(-1, -accounts_limit, -1): 2465 account = _accounts[i] 2466 if debug: 2467 print(account, len(table[account])) 2468 transaction_limit = len(table[account]) + 1 2469 for j in range(-1, -transaction_limit, -1): 2470 row = table[account][j] 2471 if debug: 2472 print(row, self.balance(account), self.balance(account, False)) 2473 assert self.balance(account) == self.balance(account, False) 2474 assert self.balance(account) == row[2] 2475 assert self.recall(False, debug) 2476 assert self.recall(False, debug) is False 2477 count = len(self._vault['history']) 2478 if debug: 2479 print('history-count', count) 2480 assert count == 0 2481 self.reset() 2482 2483 def test(self, debug: bool = False) -> bool: 2484 if debug: 2485 print('test', f'debug={debug}') 2486 try: 2487 2488 self._test_core(True, debug) 2489 self._test_core(False, debug) 2490 2491 assert self._history() 2492 2493 # Not allowed for duplicate transactions in the same account and time 2494 2495 created = ZakatTracker.time() 2496 self.track(100, 'test-1', 'same', True, created) 2497 failed = False 2498 try: 2499 self.track(50, 'test-1', 'same', True, created) 2500 except: 2501 failed = True 2502 assert failed is True 2503 2504 self.reset() 2505 2506 # Same account transfer 2507 for x in [1, 'a', True, 1.8, None]: 2508 failed = False 2509 try: 2510 self.transfer(1, x, x, 'same-account', debug=debug) 2511 except: 2512 failed = True 2513 assert failed is True 2514 2515 # Always preserve box age during transfer 2516 2517 series: list[tuple] = [ 2518 (30, 4), 2519 (60, 3), 2520 (90, 2), 2521 ] 2522 case = { 2523 3000: { 2524 'series': series, 2525 'rest': 15000, 2526 }, 2527 6000: { 2528 'series': series, 2529 'rest': 12000, 2530 }, 2531 9000: { 2532 'series': series, 2533 'rest': 9000, 2534 }, 2535 18000: { 2536 'series': series, 2537 'rest': 0, 2538 }, 2539 27000: { 2540 'series': series, 2541 'rest': -9000, 2542 }, 2543 36000: { 2544 'series': series, 2545 'rest': -18000, 2546 }, 2547 } 2548 2549 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2550 2551 for total in case: 2552 if debug: 2553 print('--------------------------------------------------------') 2554 print(f'case[{total}]', case[total]) 2555 for x in case[total]['series']: 2556 self.track( 2557 unscaled_value=x[0], 2558 desc=f"test-{x} ages", 2559 account='ages', 2560 logging=True, 2561 created=selected_time * x[1], 2562 ) 2563 2564 unscaled_total = self.unscale(total) 2565 if debug: 2566 print('unscaled_total', unscaled_total) 2567 refs = self.transfer( 2568 unscaled_amount=unscaled_total, 2569 from_account='ages', 2570 to_account='future', 2571 desc='Zakat Movement', 2572 debug=debug, 2573 ) 2574 2575 if debug: 2576 print('refs', refs) 2577 2578 ages_cache_balance = self.balance('ages') 2579 ages_fresh_balance = self.balance('ages', False) 2580 rest = case[total]['rest'] 2581 if debug: 2582 print('source', ages_cache_balance, ages_fresh_balance, rest) 2583 assert ages_cache_balance == rest 2584 assert ages_fresh_balance == rest 2585 2586 future_cache_balance = self.balance('future') 2587 future_fresh_balance = self.balance('future', False) 2588 if debug: 2589 print('target', future_cache_balance, future_fresh_balance, total) 2590 print('refs', refs) 2591 assert future_cache_balance == total 2592 assert future_fresh_balance == total 2593 2594 # TODO: check boxes times for `ages` should equal box times in `future` 2595 for ref in self._vault['account']['ages']['box']: 2596 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2597 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2598 future_capital = 0 2599 if ref in self._vault['account']['future']['box']: 2600 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2601 future_rest = 0 2602 if ref in self._vault['account']['future']['box']: 2603 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2604 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2605 if debug: 2606 print('================================================================') 2607 print('ages', ages_capital, ages_rest) 2608 print('future', future_capital, future_rest) 2609 if ages_rest == 0: 2610 assert ages_capital == future_capital 2611 elif ages_rest < 0: 2612 assert -ages_capital == future_capital 2613 elif ages_rest > 0: 2614 assert ages_capital == ages_rest + future_capital 2615 self.reset() 2616 assert len(self._vault['history']) == 0 2617 2618 assert self._history() 2619 assert self._history(False) is False 2620 assert self._history() is False 2621 assert self._history(True) 2622 assert self._history() 2623 if debug: 2624 print('####################################################################') 2625 2626 transaction = [ 2627 ( 2628 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2629 2000, 2000, 2000, 1, 1, 2630 ), 2631 ( 2632 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2633 75000, 75000, 75000, 1, 1, 2634 ), 2635 ( 2636 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2637 60000, 60000, 60000, 1, 1, 2638 ), 2639 ] 2640 for z in transaction: 2641 self.lock() 2642 x = z[1] 2643 y = z[2] 2644 self.transfer( 2645 unscaled_amount=z[0], 2646 from_account=x, 2647 to_account=y, 2648 desc='test-transfer', 2649 debug=debug, 2650 ) 2651 zz = self.balance(x) 2652 if debug: 2653 print(zz, z) 2654 assert zz == z[3] 2655 xx = self.accounts()[x] 2656 assert xx == z[3] 2657 assert self.balance(x, False) == z[4] 2658 assert xx == z[4] 2659 2660 s = 0 2661 log = self._vault['account'][x]['log'] 2662 for i in log: 2663 s += log[i]['value'] 2664 if debug: 2665 print('s', s, 'z[5]', z[5]) 2666 assert s == z[5] 2667 2668 assert self.box_size(x) == z[6] 2669 assert self.log_size(x) == z[7] 2670 2671 yy = self.accounts()[y] 2672 assert self.balance(y) == z[8] 2673 assert yy == z[8] 2674 assert self.balance(y, False) == z[9] 2675 assert yy == z[9] 2676 2677 s = 0 2678 log = self._vault['account'][y]['log'] 2679 for i in log: 2680 s += log[i]['value'] 2681 assert s == z[10] 2682 2683 assert self.box_size(y) == z[11] 2684 assert self.log_size(y) == z[12] 2685 assert self.free(self.lock()) 2686 2687 if debug: 2688 pp().pprint(self.check(2.17)) 2689 2690 assert not self.nolock() 2691 history_count = len(self._vault['history']) 2692 if debug: 2693 print('history-count', history_count) 2694 assert history_count == 4 2695 assert not self.free(ZakatTracker.time()) 2696 assert self.free(self.lock()) 2697 assert self.nolock() 2698 assert len(self._vault['history']) == 3 2699 2700 # storage 2701 2702 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 2703 if os.path.exists(_path): 2704 os.remove(_path) 2705 self.save() 2706 assert os.path.getsize(_path) > 0 2707 self.reset() 2708 assert self.recall(False, debug) is False 2709 self.load() 2710 assert self._vault['account'] is not None 2711 2712 # recall 2713 2714 assert self.nolock() 2715 assert len(self._vault['history']) == 3 2716 assert self.recall(False, debug) is True 2717 assert len(self._vault['history']) == 2 2718 assert self.recall(False, debug) is True 2719 assert len(self._vault['history']) == 1 2720 assert self.recall(False, debug) is True 2721 assert len(self._vault['history']) == 0 2722 assert self.recall(False, debug) is False 2723 assert len(self._vault['history']) == 0 2724 2725 # exchange 2726 2727 self.exchange("cash", 25, 3.75, "2024-06-25") 2728 self.exchange("cash", 22, 3.73, "2024-06-22") 2729 self.exchange("cash", 15, 3.69, "2024-06-15") 2730 self.exchange("cash", 10, 3.66) 2731 2732 for i in range(1, 30): 2733 exchange = self.exchange("cash", i) 2734 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2735 if debug: 2736 print(i, rate, description, created) 2737 assert created 2738 if i < 10: 2739 assert rate == 1 2740 assert description is None 2741 elif i == 10: 2742 assert rate == 3.66 2743 assert description is None 2744 elif i < 15: 2745 assert rate == 3.66 2746 assert description is None 2747 elif i == 15: 2748 assert rate == 3.69 2749 assert description is not None 2750 elif i < 22: 2751 assert rate == 3.69 2752 assert description is not None 2753 elif i == 22: 2754 assert rate == 3.73 2755 assert description is not None 2756 elif i >= 25: 2757 assert rate == 3.75 2758 assert description is not None 2759 exchange = self.exchange("bank", i) 2760 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2761 if debug: 2762 print(i, rate, description, created) 2763 assert created 2764 assert rate == 1 2765 assert description is None 2766 2767 assert len(self._vault['exchange']) > 0 2768 assert len(self.exchanges()) > 0 2769 self._vault['exchange'].clear() 2770 assert len(self._vault['exchange']) == 0 2771 assert len(self.exchanges()) == 0 2772 2773 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2774 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2775 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2776 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2777 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2778 2779 for i in [x * 0.12 for x in range(-15, 21)]: 2780 if i <= 0: 2781 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2782 else: 2783 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2784 2785 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2786 for i in range(1, 31): 2787 timestamp_ns = ZakatTracker.day_to_time(i) 2788 exchange = self.exchange("cash", timestamp_ns) 2789 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2790 if debug: 2791 print(i, rate, description, created) 2792 assert created 2793 if i < 10: 2794 assert rate == 1 2795 assert description is None 2796 elif i == 10: 2797 assert rate == 3.66 2798 assert description is None 2799 elif i < 15: 2800 assert rate == 3.66 2801 assert description is None 2802 elif i == 15: 2803 assert rate == 3.69 2804 assert description is not None 2805 elif i < 22: 2806 assert rate == 3.69 2807 assert description is not None 2808 elif i == 22: 2809 assert rate == 3.73 2810 assert description is not None 2811 elif i >= 25: 2812 assert rate == 3.75 2813 assert description is not None 2814 exchange = self.exchange("bank", i) 2815 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2816 if debug: 2817 print(i, rate, description, created) 2818 assert created 2819 assert rate == 1 2820 assert description is None 2821 2822 # csv 2823 2824 csv_count = 1000 2825 2826 for with_rate, path in { 2827 False: 'test-import_csv-no-exchange', 2828 True: 'test-import_csv-with-exchange', 2829 }.items(): 2830 2831 if debug: 2832 print('test_import_csv', with_rate, path) 2833 2834 csv_path = path + '.csv' 2835 if os.path.exists(csv_path): 2836 os.remove(csv_path) 2837 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2838 if debug: 2839 print('generate_random_csv_file', c) 2840 assert c == csv_count 2841 assert os.path.getsize(csv_path) > 0 2842 cache_path = self.import_csv_cache_path() 2843 if os.path.exists(cache_path): 2844 os.remove(cache_path) 2845 self.reset() 2846 (created, found, bad) = self.import_csv(csv_path, debug) 2847 bad_count = len(bad) 2848 assert bad_count > 0 2849 if debug: 2850 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2851 print('bad', bad) 2852 tmp_size = os.path.getsize(cache_path) 2853 assert tmp_size > 0 2854 # TODO: assert created + found + bad_count == csv_count 2855 # TODO: assert created == csv_count 2856 # TODO: assert bad_count == 0 2857 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2858 bad_2_count = len(bad_2) 2859 if debug: 2860 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2861 print('bad', bad) 2862 assert bad_2_count > 0 2863 # TODO: assert tmp_size == os.path.getsize(cache_path) 2864 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2865 # TODO: assert created == found_2 2866 # TODO: assert bad_count == bad_2_count 2867 # TODO: assert found_2 == csv_count 2868 # TODO: assert bad_2_count == 0 2869 # TODO: assert created_2 == 0 2870 2871 # payment parts 2872 2873 positive_parts = self.build_payment_parts(100, positive_only=True) 2874 assert self.check_payment_parts(positive_parts) != 0 2875 assert self.check_payment_parts(positive_parts) != 0 2876 all_parts = self.build_payment_parts(300, positive_only=False) 2877 assert self.check_payment_parts(all_parts) != 0 2878 assert self.check_payment_parts(all_parts) != 0 2879 if debug: 2880 pp().pprint(positive_parts) 2881 pp().pprint(all_parts) 2882 # dynamic discount 2883 suite = [] 2884 count = 3 2885 for exceed in [False, True]: 2886 case = [] 2887 for parts in [positive_parts, all_parts]: 2888 part = parts.copy() 2889 demand = part['demand'] 2890 if debug: 2891 print(demand, part['total']) 2892 i = 0 2893 z = demand / count 2894 cp = { 2895 'account': {}, 2896 'demand': demand, 2897 'exceed': exceed, 2898 'total': part['total'], 2899 } 2900 j = '' 2901 for x, y in part['account'].items(): 2902 x_exchange = self.exchange(x) 2903 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2904 if exceed and zz <= demand: 2905 i += 1 2906 y['part'] = zz 2907 if debug: 2908 print(exceed, y) 2909 cp['account'][x] = y 2910 case.append(y) 2911 elif not exceed and y['balance'] >= zz: 2912 i += 1 2913 y['part'] = zz 2914 if debug: 2915 print(exceed, y) 2916 cp['account'][x] = y 2917 case.append(y) 2918 j = x 2919 if i >= count: 2920 break 2921 if len(cp['account'][j]) > 0: 2922 suite.append(cp) 2923 if debug: 2924 print('suite', len(suite)) 2925 # vault = self._vault.copy() 2926 for case in suite: 2927 # self._vault = vault.copy() 2928 if debug: 2929 print('case', case) 2930 result = self.check_payment_parts(case) 2931 if debug: 2932 print('check_payment_parts', result, f'exceed: {exceed}') 2933 assert result == 0 2934 2935 report = self.check(2.17, None, debug) 2936 (valid, brief, plan) = report 2937 if debug: 2938 print('valid', valid) 2939 zakat_result = self.zakat(report, parts=case, debug=debug) 2940 if debug: 2941 print('zakat-result', zakat_result) 2942 assert valid == zakat_result 2943 2944 assert self.save(path + f'.{self.ext()}') 2945 assert self.export_json(path + '.json') 2946 2947 assert self.export_json("1000-transactions-test.json") 2948 assert self.save(f"1000-transactions-test.{self.ext()}") 2949 2950 self.reset() 2951 2952 # test transfer between accounts with different exchange rate 2953 2954 a_SAR = "Bank (SAR)" 2955 b_USD = "Bank (USD)" 2956 c_SAR = "Safe (SAR)" 2957 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2958 for case in [ 2959 (0, a_SAR, "SAR Gift", 1000, 100000), 2960 (1, a_SAR, 1), 2961 (0, b_USD, "USD Gift", 500, 50000), 2962 (1, b_USD, 1), 2963 (2, b_USD, 3.75), 2964 (1, b_USD, 3.75), 2965 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2966 (0, c_SAR, "Salary", 750, 75000), 2967 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2968 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2969 ]: 2970 if debug: 2971 print('case', case) 2972 match (case[0]): 2973 case 0: # track 2974 _, account, desc, x, balance = case 2975 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2976 2977 cached_value = self.balance(account, cached=True) 2978 fresh_value = self.balance(account, cached=False) 2979 if debug: 2980 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2981 assert cached_value == balance 2982 assert fresh_value == balance 2983 case 1: # check-exchange 2984 _, account, expected_rate = case 2985 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2986 if debug: 2987 print('t-exchange', t_exchange) 2988 assert t_exchange['rate'] == expected_rate 2989 case 2: # do-exchange 2990 _, account, rate = case 2991 self.exchange(account, rate=rate, debug=debug) 2992 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2993 if debug: 2994 print('b-exchange', b_exchange) 2995 assert b_exchange['rate'] == rate 2996 case 3: # transfer 2997 _, x, a, b, desc, a_balance, b_balance = case 2998 self.transfer(x, a, b, desc, debug=debug) 2999 3000 cached_value = self.balance(a, cached=True) 3001 fresh_value = self.balance(a, cached=False) 3002 if debug: 3003 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 3004 assert cached_value == a_balance 3005 assert fresh_value == a_balance 3006 3007 cached_value = self.balance(b, cached=True) 3008 fresh_value = self.balance(b, cached=False) 3009 if debug: 3010 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3011 assert cached_value == b_balance 3012 assert fresh_value == b_balance 3013 3014 # Transfer all in many chunks randomly from B to A 3015 a_SAR_balance = 137125 3016 b_USD_balance = 50100 3017 b_USD_exchange = self.exchange(b_USD) 3018 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3019 if debug: 3020 print('amounts', amounts) 3021 i = 0 3022 for x in amounts: 3023 if debug: 3024 print(f'{i} - transfer-with-exchange({x})') 3025 self.transfer( 3026 unscaled_amount=self.unscale(x), 3027 from_account=b_USD, 3028 to_account=a_SAR, 3029 desc=f"{x} USD -> SAR", 3030 debug=debug, 3031 ) 3032 3033 b_USD_balance -= x 3034 cached_value = self.balance(b_USD, cached=True) 3035 fresh_value = self.balance(b_USD, cached=False) 3036 if debug: 3037 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3038 b_USD_balance) 3039 assert cached_value == b_USD_balance 3040 assert fresh_value == b_USD_balance 3041 3042 a_SAR_balance += int(x * b_USD_exchange['rate']) 3043 cached_value = self.balance(a_SAR, cached=True) 3044 fresh_value = self.balance(a_SAR, cached=False) 3045 if debug: 3046 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3047 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3048 assert cached_value == a_SAR_balance 3049 assert fresh_value == a_SAR_balance 3050 i += 1 3051 3052 # Transfer all in many chunks randomly from C to A 3053 c_SAR_balance = 37500 3054 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3055 if debug: 3056 print('amounts', amounts) 3057 i = 0 3058 for x in amounts: 3059 if debug: 3060 print(f'{i} - transfer-with-exchange({x})') 3061 self.transfer( 3062 unscaled_amount=self.unscale(x), 3063 from_account=c_SAR, 3064 to_account=a_SAR, 3065 desc=f"{x} SAR -> a_SAR", 3066 debug=debug, 3067 ) 3068 3069 c_SAR_balance -= x 3070 cached_value = self.balance(c_SAR, cached=True) 3071 fresh_value = self.balance(c_SAR, cached=False) 3072 if debug: 3073 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3074 c_SAR_balance) 3075 assert cached_value == c_SAR_balance 3076 assert fresh_value == c_SAR_balance 3077 3078 a_SAR_balance += x 3079 cached_value = self.balance(a_SAR, cached=True) 3080 fresh_value = self.balance(a_SAR, cached=False) 3081 if debug: 3082 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3083 a_SAR_balance) 3084 assert cached_value == a_SAR_balance 3085 assert fresh_value == a_SAR_balance 3086 i += 1 3087 3088 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3089 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3090 3091 # check & zakat with exchange rates for many cycles 3092 3093 for rate, values in { 3094 1: { 3095 'in': [1000, 2000, 10000], 3096 'exchanged': [100000, 200000, 1000000], 3097 'out': [2500, 5000, 73140], 3098 }, 3099 3.75: { 3100 'in': [200, 1000, 5000], 3101 'exchanged': [75000, 375000, 1875000], 3102 'out': [1875, 9375, 137138], 3103 }, 3104 }.items(): 3105 a, b, c = values['in'] 3106 m, n, o = values['exchanged'] 3107 x, y, z = values['out'] 3108 if debug: 3109 print('rate', rate, 'values', values) 3110 for case in [ 3111 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3112 {'safe': {0: {'below_nisab': x}}}, 3113 ], False, m), 3114 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3115 {'safe': {0: {'count': 1, 'total': y}}}, 3116 ], True, n), 3117 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3118 {'cave': {0: {'count': 3, 'total': z}}}, 3119 ], True, o), 3120 ]: 3121 if debug: 3122 print(f"############# check(rate: {rate}) #############") 3123 print('case', case) 3124 self.reset() 3125 self.exchange(account=case[1], created=case[2], rate=rate) 3126 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3127 assert self.snapshot() 3128 3129 # assert self.nolock() 3130 # history_size = len(self._vault['history']) 3131 # print('history_size', history_size) 3132 # assert history_size == 2 3133 assert self.lock() 3134 assert not self.nolock() 3135 report = self.check(2.17, None, debug) 3136 (valid, brief, plan) = report 3137 if debug: 3138 print('brief', brief) 3139 assert valid == case[4] 3140 assert case[5] == brief[0] 3141 assert case[5] == brief[1] 3142 3143 if debug: 3144 pp().pprint(plan) 3145 3146 for x in plan: 3147 assert case[1] == x 3148 if 'total' in case[3][0][x][0].keys(): 3149 assert case[3][0][x][0]['total'] == int(brief[2]) 3150 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3151 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3152 else: 3153 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3154 if debug: 3155 pp().pprint(report) 3156 result = self.zakat(report, debug=debug) 3157 if debug: 3158 print('zakat-result', result, case[4]) 3159 assert result == case[4] 3160 report = self.check(2.17, None, debug) 3161 (valid, brief, plan) = report 3162 assert valid is False 3163 3164 history_size = len(self._vault['history']) 3165 if debug: 3166 print('history_size', history_size) 3167 assert history_size == 3 3168 assert not self.nolock() 3169 assert self.recall(False, debug) is False 3170 self.free(self.lock()) 3171 assert self.nolock() 3172 3173 for i in range(3, 0, -1): 3174 history_size = len(self._vault['history']) 3175 if debug: 3176 print('history_size', history_size) 3177 assert history_size == i 3178 assert self.recall(False, debug) is True 3179 3180 assert self.nolock() 3181 assert self.recall(False, debug) is False 3182 3183 history_size = len(self._vault['history']) 3184 if debug: 3185 print('history_size', history_size) 3186 assert history_size == 0 3187 3188 account_size = len(self._vault['account']) 3189 if debug: 3190 print('account_size', account_size) 3191 assert account_size == 0 3192 3193 report_size = len(self._vault['report']) 3194 if debug: 3195 print('report_size', report_size) 3196 assert report_size == 0 3197 3198 assert self.nolock() 3199 return True 3200 except Exception as e: 3201 # pp().pprint(self._vault) 3202 assert self.export_json("test-snapshot.json") 3203 assert self.save(f"test-snapshot.{self.ext()}") 3204 raise e
A class for tracking and calculating Zakat.
This class provides functionalities for recording transactions, calculating Zakat due, and managing account balances. It also offers features like importing transactions from CSV files, exporting data to JSON format, and saving/loading the tracker state.
The ZakatTracker
class is designed to handle both positive and negative transactions,
allowing for flexible tracking of financial activities related to Zakat. It also supports
the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due
based on the current silver price.
The class uses a camel file as its database to persist the tracker state, ensuring data integrity across sessions. It also provides options for enabling or disabling history tracking, allowing users to choose their preferred level of detail.
In addition, the ZakatTracker
class includes various helper methods like
time
, time_to_datetime
, lock
, free
, recall
, export_json
,
and more. These methods provide additional functionalities and flexibility
for interacting with and managing the Zakat tracker.
Attributes: ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. ZakatTracker.Version (function): The version of the ZakatTracker class.
Data Structure: The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data.
_vault (dict):
- account (dict):
- {account_number} (dict):
- balance (int): The current balance of the account.
- box (dict): A dictionary storing transaction details.
- {timestamp} (dict):
- capital (int): The initial amount of the transaction.
- count (int): The number of times Zakat has been calculated for this transaction.
- last (int): The timestamp of the last Zakat calculation.
- rest (int): The remaining amount after Zakat deductions and withdrawal.
- total (int): The total Zakat deducted from this transaction.
- count (int): The total number of transactions for the account.
- log (dict): A dictionary storing transaction logs.
- {timestamp} (dict):
- value (int): The transaction amount (positive or negative).
- desc (str): The description of the transaction.
- ref (int): The box reference (positive or None).
- file (dict): A dictionary storing file references associated with the transaction.
- hide (bool): Indicates whether the account is hidden or not.
- zakatable (bool): Indicates whether the account is subject to Zakat.
- exchange (dict):
- account (dict):
- {timestamps} (dict):
- rate (float): Exchange rate when compared to local currency.
- description (str): The description of the exchange rate.
- history (dict):
- {timestamp} (list): A list of dictionaries storing the history of actions performed.
- {action_dict} (dict):
- action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT).
- account (str): The account number associated with the action.
- ref (int): The reference number of the transaction.
- file (int): The reference number of the file (if applicable).
- key (str): The key associated with the action (e.g., 'rest', 'total').
- value (int): The value associated with the action.
- math (MathOperation): The mathematical operation performed (if applicable).
- lock (int or None): The timestamp indicating the current lock status (None if not locked).
- report (dict):
- {timestamp} (tuple): A tuple storing Zakat report details.
270 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 271 """ 272 Initialize ZakatTracker with database path and history mode. 273 274 Parameters: 275 db_path (str): The path to the database file. Default is "zakat.camel". 276 history_mode (bool): The mode for tracking history. Default is True. 277 278 Returns: 279 None 280 """ 281 self._base_path = None 282 self._vault_path = None 283 self._vault = None 284 self.reset() 285 self._history(history_mode) 286 self.path(db_path)
Initialize ZakatTracker with database path and history mode.
Parameters: db_path (str): The path to the database file. Default is "zakat.camel". history_mode (bool): The mode for tracking history. Default is True.
Returns: None
194 @staticmethod 195 def Version() -> str: 196 """ 197 Returns the current version of the software. 198 199 This function returns a string representing the current version of the software, 200 including major, minor, and patch version numbers in the format "X.Y.Z". 201 202 Returns: 203 str: The current version of the software. 204 """ 205 return '0.2.87'
Returns the current version of the software.
This function returns a string representing the current version of the software, including major, minor, and patch version numbers in the format "X.Y.Z".
Returns: str: The current version of the software.
207 @staticmethod 208 def ZakatCut(x: float) -> float: 209 """ 210 Calculates the Zakat amount due on an asset. 211 212 This function calculates the zakat amount due on a given asset value over one lunar year. 213 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 214 that exceeds a certain threshold (Nisab). 215 216 Parameters: 217 x: The total value of the asset on which Zakat is to be calculated. 218 219 Returns: 220 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 221 """ 222 return 0.025 * x # Zakat Cut in one Lunar Year
Calculates the Zakat amount due on an asset.
This function calculates the zakat amount due on a given asset value over one lunar year. Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth that exceeds a certain threshold (Nisab).
Parameters: x: The total value of the asset on which Zakat is to be calculated.
Returns: The amount of Zakat due on the asset, calculated as 2.5% of the asset's value.
224 @staticmethod 225 def TimeCycle(days: int = 355) -> int: 226 """ 227 Calculates the approximate duration of a lunar year in nanoseconds. 228 229 This function calculates the approximate duration of a lunar year based on the given number of days. 230 It converts the given number of days into nanoseconds for use in high-precision timing applications. 231 232 Parameters: 233 days: The number of days in a lunar year. Defaults to 355, 234 which is an approximation of the average length of a lunar year. 235 236 Returns: 237 The approximate duration of a lunar year in nanoseconds. 238 """ 239 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds
Calculates the approximate duration of a lunar year in nanoseconds.
This function calculates the approximate duration of a lunar year based on the given number of days. It converts the given number of days into nanoseconds for use in high-precision timing applications.
Parameters: days: The number of days in a lunar year. Defaults to 355, which is an approximation of the average length of a lunar year.
Returns: The approximate duration of a lunar year in nanoseconds.
241 @staticmethod 242 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 243 """ 244 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 245 246 This function calculates the Nisab value, which is the minimum threshold of wealth, 247 that makes an individual liable for paying Zakat. 248 The Nisab value is determined by the equivalent value of a specific amount 249 of gold or silver (currently 595 grams in silver) in the local currency. 250 251 Parameters: 252 - gram_price (float): The price per gram of Nisab. 253 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 254 255 Returns: 256 - float: The total value of Nisab based on the given price per gram. 257 """ 258 return gram_price * gram_quantity
Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram.
This function calculates the Nisab value, which is the minimum threshold of wealth, that makes an individual liable for paying Zakat. The Nisab value is determined by the equivalent value of a specific amount of gold or silver (currently 595 grams in silver) in the local currency.
Parameters:
- gram_price (float): The price per gram of Nisab.
- gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver.
Returns:
- float: The total value of Nisab based on the given price per gram.
260 @staticmethod 261 def ext() -> str: 262 """ 263 Returns the file extension used by the ZakatTracker class. 264 265 Returns: 266 str: The file extension used by the ZakatTracker class, which is 'camel'. 267 """ 268 return 'camel'
Returns the file extension used by the ZakatTracker class.
Returns: str: The file extension used by the ZakatTracker class, which is 'camel'.
288 def path(self, path: str = None) -> str: 289 """ 290 Set or get the path to the database file. 291 292 If no path is provided, the current path is returned. 293 If a path is provided, it is set as the new path. 294 The function also creates the necessary directories if the provided path is a file. 295 296 Parameters: 297 path (str): The new path to the database file. If not provided, the current path is returned. 298 299 Returns: 300 str: The current or new path to the database file. 301 """ 302 if path is None: 303 return self._vault_path 304 self._vault_path = Path(path).resolve() 305 base_path = Path(path).resolve() 306 if base_path.is_file() or base_path.suffix: 307 base_path = base_path.parent 308 base_path.mkdir(parents=True, exist_ok=True) 309 self._base_path = base_path 310 return str(self._vault_path)
Set or get the path to the database file.
If no path is provided, the current path is returned. If a path is provided, it is set as the new path. The function also creates the necessary directories if the provided path is a file.
Parameters: path (str): The new path to the database file. If not provided, the current path is returned.
Returns: str: The current or new path to the database file.
312 def base_path(self, *args) -> str: 313 """ 314 Generate a base path by joining the provided arguments with the existing base path. 315 316 Parameters: 317 *args (str): Variable length argument list of strings to be joined with the base path. 318 319 Returns: 320 str: The generated base path. If no arguments are provided, the existing base path is returned. 321 """ 322 if not args: 323 return str(self._base_path) 324 filtered_args = [] 325 ignored_filename = None 326 for arg in args: 327 if Path(arg).suffix: 328 ignored_filename = arg 329 else: 330 filtered_args.append(arg) 331 base_path = Path(self._base_path) 332 full_path = base_path.joinpath(*filtered_args) 333 full_path.mkdir(parents=True, exist_ok=True) 334 if ignored_filename is not None: 335 return full_path.resolve() / ignored_filename # Join with the ignored filename 336 return str(full_path.resolve())
Generate a base path by joining the provided arguments with the existing base path.
Parameters: *args (str): Variable length argument list of strings to be joined with the base path.
Returns: str: The generated base path. If no arguments are provided, the existing base path is returned.
338 @staticmethod 339 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 340 """ 341 Scales a numerical value by a specified power of 10, returning an integer. 342 343 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 344 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 345 346 Parameters: 347 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 348 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 349 by a factor of 100 (e.g., converts 1.23 to 123). 350 351 Returns: 352 The scaled value, rounded to the nearest integer. 353 354 Raises: 355 TypeError: If the input `x` is not a valid numeric type. 356 357 Examples: 358 >>> ZakatTracker.scale(3.14159) 359 314 360 >>> ZakatTracker.scale(1234, decimal_places=3) 361 1234000 362 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 363 50 364 """ 365 if not isinstance(x, (float, int, Decimal)): 366 raise TypeError("Input 'x' must be a float, int, or Decimal.") 367 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places))
Scales a numerical value by a specified power of 10, returning an integer.
This function is designed to handle various numeric types (float
, int
, or Decimal
) and
facilitate precise scaling operations, particularly useful in financial or scientific calculations.
Parameters: x: The numeric value to scale. Can be a floating-point number, integer, or decimal. decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled by a factor of 100 (e.g., converts 1.23 to 123).
Returns: The scaled value, rounded to the nearest integer.
Raises:
TypeError: If the input x
is not a valid numeric type.
Examples:
>>> ZakatTracker.scale(3.14159)
314
>>> ZakatTracker.scale(1234, decimal_places=3)
1234000
>>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4)
50
369 @staticmethod 370 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 371 """ 372 Unscales an integer by a power of 10. 373 374 Parameters: 375 x: The integer to unscale. 376 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 377 decimal_places: The power of 10 to use. Defaults to 2. 378 379 Returns: 380 The unscaled number, converted to the specified return_type. 381 382 Raises: 383 TypeError: If the return_type is not float or Decimal. 384 """ 385 if return_type not in (float, Decimal): 386 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 387 return round(return_type(x / (10 ** decimal_places)), decimal_places)
Unscales an integer by a power of 10.
Parameters: x: The integer to unscale. return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. decimal_places: The power of 10 to use. Defaults to 2.
Returns: The unscaled number, converted to the specified return_type.
Raises: TypeError: If the return_type is not float or Decimal.
403 def reset(self) -> None: 404 """ 405 Reset the internal data structure to its initial state. 406 407 Parameters: 408 None 409 410 Returns: 411 None 412 """ 413 self._vault = { 414 'account': {}, 415 'exchange': {}, 416 'history': {}, 417 'lock': None, 418 'report': {}, 419 }
Reset the internal data structure to its initial state.
Parameters: None
Returns: None
421 @staticmethod 422 def time(now: datetime = None) -> int: 423 """ 424 Generates a timestamp based on the provided datetime object or the current datetime. 425 426 Parameters: 427 now (datetime, optional): The datetime object to generate the timestamp from. 428 If not provided, the current datetime is used. 429 430 Returns: 431 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 432 before 1970 will return in negative until 1000AD. 433 """ 434 if now is None: 435 now = datetime.datetime.now() 436 ordinal_day = now.toordinal() 437 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 438 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day)
Generates a timestamp based on the provided datetime object or the current datetime.
Parameters: now (datetime, optional): The datetime object to generate the timestamp from. If not provided, the current datetime is used.
Returns: int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), before 1970 will return in negative until 1000AD.
440 @staticmethod 441 def time_to_datetime(ordinal_ns: int) -> datetime: 442 """ 443 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 444 445 Parameters: 446 ordinal_ns (int): The ordinal number of days since 1000-01-01. 447 448 Returns: 449 datetime: The corresponding datetime object. 450 """ 451 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 452 ns_in_day = ordinal_ns % 86_400_000_000_000 453 d = datetime.datetime.fromordinal(ordinal_day) 454 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 455 return datetime.datetime.combine(d, datetime.time()) + t
Converts an ordinal number (number of days since 1000-01-01) to a datetime object.
Parameters: ordinal_ns (int): The ordinal number of days since 1000-01-01.
Returns: datetime: The corresponding datetime object.
457 def clean_history(self, lock: int | None = None) -> int: 458 """ 459 Cleans up the history of actions performed on the ZakatTracker instance. 460 461 Parameters: 462 lock (int, optional): The lock ID is used to clean up the empty history. 463 If not provided, it cleans up the empty history records for all locks. 464 465 Returns: 466 int: The number of locks cleaned up. 467 """ 468 count = 0 469 if lock in self._vault['history']: 470 if len(self._vault['history'][lock]) <= 0: 471 count += 1 472 del self._vault['history'][lock] 473 return count 474 self.free(self.lock()) 475 for lock in self._vault['history']: 476 if len(self._vault['history'][lock]) <= 0: 477 count += 1 478 del self._vault['history'][lock] 479 return count
Cleans up the history of actions performed on the ZakatTracker instance.
Parameters: lock (int, optional): The lock ID is used to clean up the empty history. If not provided, it cleans up the empty history records for all locks.
Returns: int: The number of locks cleaned up.
517 def nolock(self) -> bool: 518 """ 519 Check if the vault lock is currently not set. 520 521 Returns: 522 bool: True if the vault lock is not set, False otherwise. 523 """ 524 return self._vault['lock'] is None
Check if the vault lock is currently not set.
Returns: bool: True if the vault lock is not set, False otherwise.
526 def lock(self) -> int: 527 """ 528 Acquires a lock on the ZakatTracker instance. 529 530 Returns: 531 int: The lock ID. This ID can be used to release the lock later. 532 """ 533 return self._step()
Acquires a lock on the ZakatTracker instance.
Returns: int: The lock ID. This ID can be used to release the lock later.
535 def vault(self) -> dict: 536 """ 537 Returns a copy of the internal vault dictionary. 538 539 This method is used to retrieve the current state of the ZakatTracker object. 540 It provides a snapshot of the internal data structure, allowing for further 541 processing or analysis. 542 543 Returns: 544 dict: A copy of the internal vault dictionary. 545 """ 546 return self._vault.copy()
Returns a copy of the internal vault dictionary.
This method is used to retrieve the current state of the ZakatTracker object. It provides a snapshot of the internal data structure, allowing for further processing or analysis.
Returns: dict: A copy of the internal vault dictionary.
548 def stats(self) -> dict[str, tuple]: 549 """ 550 Calculates and returns statistics about the object's data storage. 551 552 This method determines the size of the database file on disk and the 553 size of the data currently held in RAM (likely within a dictionary). 554 Both sizes are reported in bytes and in a human-readable format 555 (e.g., KB, MB). 556 557 Returns: 558 dict[str, tuple]: A dictionary containing the following statistics: 559 560 * 'database': A tuple with two elements: 561 - The database file size in bytes (int). 562 - The database file size in human-readable format (str). 563 * 'ram': A tuple with two elements: 564 - The RAM usage (dictionary size) in bytes (int). 565 - The RAM usage in human-readable format (str). 566 567 Example: 568 >>> stats = my_object.stats() 569 >>> print(stats['database']) 570 (256000, '250.0 KB') 571 >>> print(stats['ram']) 572 (12345, '12.1 KB') 573 """ 574 ram_size = self.get_dict_size(self.vault()) 575 file_size = os.path.getsize(self.path()) 576 return { 577 'database': (file_size, self.human_readable_size(file_size)), 578 'ram': (ram_size, self.human_readable_size(ram_size)), 579 }
Calculates and returns statistics about the object's data storage.
This method determines the size of the database file on disk and the size of the data currently held in RAM (likely within a dictionary). Both sizes are reported in bytes and in a human-readable format (e.g., KB, MB).
Returns: dict[str, tuple]: A dictionary containing the following statistics:
* 'database': A tuple with two elements:
- The database file size in bytes (int).
- The database file size in human-readable format (str).
* 'ram': A tuple with two elements:
- The RAM usage (dictionary size) in bytes (int).
- The RAM usage in human-readable format (str).
Example:
>>> stats = my_object.stats()
>>> print(stats['database'])
(256000, '250.0 KB')
>>> print(stats['ram'])
(12345, '12.1 KB')
581 def files(self) -> list[dict[str, str | int]]: 582 """ 583 Retrieves information about files associated with this class. 584 585 This class method provides a standardized way to gather details about 586 files used by the class for storage, snapshots, and CSV imports. 587 588 Returns: 589 list[dict[str, str | int]]: A list of dictionaries, each containing information 590 about a specific file: 591 592 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 593 * path (str): The full file path. 594 * exists (bool): Whether the file exists on the filesystem. 595 * size (int): The file size in bytes (0 if the file doesn't exist). 596 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 597 598 Example: 599 ``` 600 file_info = MyClass.files() 601 for info in file_info: 602 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 603 ``` 604 """ 605 result = [] 606 for file_type, path in { 607 'database': self.path(), 608 'snapshot': self.snapshot_cache_path(), 609 'import_csv': self.import_csv_cache_path(), 610 }.items(): 611 exists = os.path.exists(path) 612 size = os.path.getsize(path) if exists else 0 613 human_readable_size = self.human_readable_size(size) if exists else 0 614 result.append({ 615 'type': file_type, 616 'path': path, 617 'exists': exists, 618 'size': size, 619 'human_readable_size': human_readable_size, 620 }) 621 return result
Retrieves information about files associated with this class.
This class method provides a standardized way to gather details about files used by the class for storage, snapshots, and CSV imports.
Returns: list[dict[str, str | int]]: A list of dictionaries, each containing information about a specific file:
* type (str): The type of file ('database', 'snapshot', 'import_csv').
* path (str): The full file path.
* exists (bool): Whether the file exists on the filesystem.
* size (int): The file size in bytes (0 if the file doesn't exist).
* human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB').
Example:
file_info = MyClass.files()
for info in file_info:
print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}")
623 def steps(self) -> dict: 624 """ 625 Returns a copy of the history of steps taken in the ZakatTracker. 626 627 The history is a dictionary where each key is a unique identifier for a step, 628 and the corresponding value is a dictionary containing information about the step. 629 630 Returns: 631 dict: A copy of the history of steps taken in the ZakatTracker. 632 """ 633 return self._vault['history'].copy()
Returns a copy of the history of steps taken in the ZakatTracker.
The history is a dictionary where each key is a unique identifier for a step, and the corresponding value is a dictionary containing information about the step.
Returns: dict: A copy of the history of steps taken in the ZakatTracker.
635 def free(self, lock: int, auto_save: bool = True) -> bool: 636 """ 637 Releases the lock on the database. 638 639 Parameters: 640 lock (int): The lock ID to be released. 641 auto_save (bool): Whether to automatically save the database after releasing the lock. 642 643 Returns: 644 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 645 """ 646 if lock == self._vault['lock']: 647 self._vault['lock'] = None 648 self.clean_history(lock) 649 if auto_save: 650 return self.save(self.path()) 651 return True 652 return False
Releases the lock on the database.
Parameters: lock (int): The lock ID to be released. auto_save (bool): Whether to automatically save the database after releasing the lock.
Returns: bool: True if the lock is successfully released and (optionally) saved, False otherwise.
654 def account_exists(self, account) -> bool: 655 """ 656 Check if the given account exists in the vault. 657 658 Parameters: 659 account (str): The account number to check. 660 661 Returns: 662 bool: True if the account exists, False otherwise. 663 """ 664 return account in self._vault['account']
Check if the given account exists in the vault.
Parameters: account (str): The account number to check.
Returns: bool: True if the account exists, False otherwise.
666 def box_size(self, account) -> int: 667 """ 668 Calculate the size of the box for a specific account. 669 670 Parameters: 671 account (str): The account number for which the box size needs to be calculated. 672 673 Returns: 674 int: The size of the box for the given account. If the account does not exist, -1 is returned. 675 """ 676 if self.account_exists(account): 677 return len(self._vault['account'][account]['box']) 678 return -1
Calculate the size of the box for a specific account.
Parameters: account (str): The account number for which the box size needs to be calculated.
Returns: int: The size of the box for the given account. If the account does not exist, -1 is returned.
680 def log_size(self, account) -> int: 681 """ 682 Get the size of the log for a specific account. 683 684 Parameters: 685 account (str): The account number for which the log size needs to be calculated. 686 687 Returns: 688 int: The size of the log for the given account. If the account does not exist, -1 is returned. 689 """ 690 if self.account_exists(account): 691 return len(self._vault['account'][account]['log']) 692 return -1
Get the size of the log for a specific account.
Parameters: account (str): The account number for which the log size needs to be calculated.
Returns: int: The size of the log for the given account. If the account does not exist, -1 is returned.
694 @staticmethod 695 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 696 """ 697 Calculates the hash of a file using the specified algorithm. 698 699 Parameters: 700 file_path (str): The path to the file. 701 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 702 703 Returns: 704 str: The hexadecimal representation of the file's hash. 705 """ 706 hash_obj = hashlib.new(algorithm) # Create the hash object 707 with open(file_path, "rb") as f: # Open file in binary mode for reading 708 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 709 hash_obj.update(chunk) 710 return hash_obj.hexdigest() # Return the hash as a hexadecimal string
Calculates the hash of a file using the specified algorithm.
Parameters: file_path (str): The path to the file. algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b".
Returns: str: The hexadecimal representation of the file's hash.
712 def snapshot_cache_path(self): 713 """ 714 Generate the path for the cache file used to store snapshots. 715 716 The cache file is a camel file that stores the timestamps of the snapshots. 717 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 718 719 Returns: 720 str: The path to the cache file. 721 """ 722 path = str(self.path()) 723 ext = self.ext() 724 ext_len = len(ext) 725 if path.endswith(f'.{ext}'): 726 path = path[:-ext_len-1] 727 _, filename = os.path.split(path + f'.snapshots.{ext}') 728 return self.base_path(filename)
Generate the path for the cache file used to store snapshots.
The cache file is a camel file that stores the timestamps of the snapshots. The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel".
Returns: str: The path to the cache file.
730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True
This function creates a snapshot of the current database state.
The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. If a snapshot with the same hash exists, the function returns True without creating a new snapshot. If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp.
Parameters: None
Returns: bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails.
762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result
Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status.
Parameters:
- hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True.
- verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False.
Returns:
- dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, and the values are tuples containing the snapshot's hash, path, and existence status.
794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True
Revert the last operation.
Parameters: dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. debug (bool): If True, the function will print debug information. Default is False.
Returns: bool: True if the operation was successful, False otherwise.
942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False
Check if a specific reference (transaction) exists in the vault for a given account and reference type.
Parameters: account (str): The account number for which to check the existence of the reference. ref_type (str): The type of reference (e.g., 'box', 'log', etc.). ref (int): The reference (transaction) number to check for existence.
Returns: bool: True if the reference exists for the given account and reference type, False otherwise.
958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref)
Check if a specific box (transaction) exists in the vault for a given account and reference.
Parameters:
- account (str): The account number for which to check the existence of the box.
- ref (int): The reference (transaction) number to check for existence.
Returns:
- bool: True if the box exists for the given account and reference, False otherwise.
971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created
This function tracks a transaction for a specific account.
Parameters: unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. desc (str): The description of the transaction. Default is an empty string. account (str): The account for which the transaction is being tracked. Default is '1'. logging (bool): Whether to log the transaction. Default is True. created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. debug (bool): Whether to print debug information. Default is False.
Returns: int: The timestamp of the transaction.
This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box.
Raises: ValueError: The log transaction happened again in the same nanosecond time. ValueError: The box transaction happened again in the same nanosecond time.
1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref)
Checks if a specific transaction log entry exists for a given account.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry.
Returns: bool: True if the transaction log entry exists, False otherwise.
1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ
This method is used to record or retrieve exchange rates for a specific account.
Parameters:
- account (str): The account number for which the exchange rate is being recorded or retrieved.
- created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used.
- rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate.
- description (str): A description of the exchange rate.
Returns:
- dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, it returns a dictionary with default values for the rate and description.
1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate
This function calculates the exchanged amount of a currency.
Args: x (float): The original amount of the currency. x_rate (float): The exchange rate of the original currency. y_rate (float): The exchange rate of the target currency.
Returns: float: The exchanged amount of the target currency.
1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy()
Retrieve the recorded exchange rates for all accounts.
Parameters: None
Returns: dict: A dictionary containing all recorded exchange rates. The keys are account names or numbers, and the values are dictionaries containing the exchange rates. Each exchange rate dictionary has timestamps as keys and exchange rate details as values.
1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result
Returns a dictionary containing account numbers as keys and their respective balances as values.
Parameters: None
Returns: dict: A dictionary where keys are account numbers and values are their respective balances.
1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {}
Retrieve the boxes (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the boxes.
Returns: dict: A dictionary containing the boxes associated with the given account. If the account does not exist, an empty dictionary is returned.
1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {}
Retrieve the logs (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the logs.
Returns: dict: A dictionary containing the logs associated with the given account. If the account does not exist, an empty dictionary is returned.
1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0
Adds a file reference to a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. path (str): The path of the file to be added.
Returns: int: The reference of the added file. If the account or transaction log entry does not exist, returns 0.
1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False
Removes a file reference from a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. file_ref (int): The reference of the file to be removed.
Returns: bool: True if the file reference is successfully removed, False otherwise.
1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1]
Calculate and return the balance of a specific account.
Parameters: account (str): The account number. Default is '1'. cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True.
Returns: int: The balance of the account.
Note: If cached is True, the function returns the cached balance. If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items.
1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False
Check or set the hide status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new hide status. If not provided, the function will return the current status.
Returns: bool: The current or updated hide status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.hide('account1') # Set the hide status of 'account1' to True
False
>>> tracker.hide('account1', True) # Set the hide status of 'account1' to True
True
>>> tracker.hide('account1') # Get the hide status of 'account1' by default
True
>>> tracker.hide('account1', False)
False
1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False
Check or set the zakatable status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new zakatable status. If not provided, the function will return the current status.
Returns: bool: The current or updated zakatable status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default
True
>>> tracker.zakatable('account1', False)
False
1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages
Subtracts a specified value from an account's balance.
Parameters: unscaled_value (float | int | Decimal): The amount to be subtracted. desc (str): A description for the transaction. Defaults to an empty string. account (str): The account from which the value will be subtracted. Defaults to '1'. created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction.
If the amount to subtract is greater than the account's balance, the remaining amount will be transferred to a new transaction with a negative value.
Raises: ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times
Transfers a specified value from one account to another.
Parameters: unscaled_amount (float | int | Decimal): The amount to be transferred. from_account (str): The account from which the value will be transferred. to_account (str): The account to which the value will be transferred. desc (str, optional): A description for the transaction. Defaults to an empty string. created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: list[int]: A list of timestamps corresponding to the transactions made during the transfer.
Raises: ValueError: Transfer to the same account is forbidden. ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan
Check the eligibility for Zakat based on the given parameters.
Parameters: silver_gram_price (float): The price of a gram of silver. unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, it will be calculated based on the silver_gram_price. debug (bool): Flag to enable debug mode. now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle().
Returns: tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, and a dictionary containing the Zakat plan.
1622 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 demand (float): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': float, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': demand, 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts
Build payment parts for the Zakat distribution.
Parameters: demand (float): The total demand for payment in local currency. positive_only (bool): If True, only consider accounts with positive balance. Default is True.
Returns: dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: { 'account': { 'account_id': {'balance': float, 'rate': float, 'part': float}, ... }, 'exceed': bool, 'demand': float, 'total': float, }
1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0
Checks the validity of payment parts.
Parameters: parts (dict): A dictionary containing payment parts information. debug (bool): Flag to enable debug mode.
Returns: int: Returns 0 if the payment parts are valid, otherwise returns the error code.
Error Codes: 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 3: 'part' value in parts['account'][x] is less than 0. 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 6: The sum of 'part' values in parts['account'] does not match with 'demand' value.
1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1779 if no_lock: 1780 self.free(self.lock()) 1781 return True
Perform Zakat calculation based on the given report and optional parts.
Parameters: report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. parts (dict): A dictionary containing the payment parts for the zakat. debug (bool): A flag indicating whether to print debug information.
Returns: bool: True if the zakat calculation is successful, False otherwise.
1783 def export_json(self, path: str = "data.json") -> bool: 1784 """ 1785 Exports the current state of the ZakatTracker object to a JSON file. 1786 1787 Parameters: 1788 path (str): The path where the JSON file will be saved. Default is "data.json". 1789 1790 Returns: 1791 bool: True if the export is successful, False otherwise. 1792 1793 Raises: 1794 No specific exceptions are raised by this method. 1795 """ 1796 with open(path, "w") as file: 1797 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1798 return True
Exports the current state of the ZakatTracker object to a JSON file.
Parameters: path (str): The path where the JSON file will be saved. Default is "data.json".
Returns: bool: True if the export is successful, False otherwise.
Raises: No specific exceptions are raised by this method.
1800 def save(self, path: str = None) -> bool: 1801 """ 1802 Saves the ZakatTracker's current state to a camel file. 1803 1804 This method serializes the internal data (`_vault`). 1805 1806 Parameters: 1807 path (str, optional): File path for saving. Defaults to a predefined location. 1808 1809 Returns: 1810 bool: True if the save operation is successful, False otherwise. 1811 """ 1812 if path is None: 1813 path = self.path() 1814 with open(f'{path}.tmp', 'w') as stream: 1815 # first save in tmp file 1816 stream.write(camel.dump(self._vault)) 1817 # then move tmp file to original location 1818 shutil.move(f'{path}.tmp', path) 1819 return True
Saves the ZakatTracker's current state to a camel file.
This method serializes the internal data (_vault
).
Parameters: path (str, optional): File path for saving. Defaults to a predefined location.
Returns: bool: True if the save operation is successful, False otherwise.
1821 def load(self, path: str = None) -> bool: 1822 """ 1823 Load the current state of the ZakatTracker object from a camel file. 1824 1825 Parameters: 1826 path (str): The path where the camel file is located. If not provided, it will use the default path. 1827 1828 Returns: 1829 bool: True if the load operation is successful, False otherwise. 1830 """ 1831 if path is None: 1832 path = self.path() 1833 if os.path.exists(path): 1834 with open(path, 'r') as stream: 1835 self._vault = camel.load(stream.read()) 1836 return True 1837 return False
Load the current state of the ZakatTracker object from a camel file.
Parameters: path (str): The path where the camel file is located. If not provided, it will use the default path.
Returns: bool: True if the load operation is successful, False otherwise.
1839 def import_csv_cache_path(self): 1840 """ 1841 Generates the cache file path for imported CSV data. 1842 1843 This function constructs the file path where cached data from CSV imports 1844 will be stored. The cache file is a camel file (.camel extension) appended 1845 to the base path of the object. 1846 1847 Returns: 1848 str: The full path to the import CSV cache file. 1849 1850 Example: 1851 >>> obj = ZakatTracker('/data/reports') 1852 >>> obj.import_csv_cache_path() 1853 '/data/reports.import_csv.camel' 1854 """ 1855 path = str(self.path()) 1856 ext = self.ext() 1857 ext_len = len(ext) 1858 if path.endswith(f'.{ext}'): 1859 path = path[:-ext_len-1] 1860 _, filename = os.path.split(path + f'.import_csv.{ext}') 1861 return self.base_path(filename)
Generates the cache file path for imported CSV data.
This function constructs the file path where cached data from CSV imports will be stored. The cache file is a camel file (.camel extension) appended to the base path of the object.
Returns: str: The full path to the import CSV cache file.
Example:
obj = ZakatTracker('/data/reports') obj.import_csv_cache_path() '/data/reports.import_csv.camel'
1863 def import_csv(self, path: str = 'file.csv', scale_decimal_places: int = 0, debug: bool = False) -> tuple: 1864 """ 1865 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1866 1867 Parameters: 1868 path (str): The path to the CSV file. Default is 'file.csv'. 1869 scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. 1870 debug (bool): A flag indicating whether to print debug information. 1871 1872 Returns: 1873 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1874 and a dictionary of bad transactions. 1875 1876 Notes: 1877 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1878 are appropriate for the currency pairs involved in the conversions. 1879 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1880 to 1.0 or the previous rate for that account. 1881 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1882 transactions of the same account within the whole imported and existing dataset when doing `check` and 1883 `zakat` operations. 1884 1885 Example Usage: 1886 The CSV file should have the following format, rate is optional per transaction: 1887 account, desc, value, date, rate 1888 For example: 1889 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1890 """ 1891 if debug: 1892 print('import_csv', f'debug={debug}') 1893 cache: list[int] = [] 1894 try: 1895 with open(self.import_csv_cache_path(), 'r') as stream: 1896 cache = camel.load(stream.read()) 1897 except: 1898 pass 1899 date_formats = [ 1900 "%Y-%m-%d %H:%M:%S", 1901 "%Y-%m-%dT%H:%M:%S", 1902 "%Y-%m-%dT%H%M%S", 1903 "%Y-%m-%d", 1904 ] 1905 created, found, bad = 0, 0, {} 1906 data: dict[int, list] = {} 1907 with open(path, newline='', encoding="utf-8") as f: 1908 i = 0 1909 for row in csv.reader(f, delimiter=','): 1910 i += 1 1911 hashed = hash(tuple(row)) 1912 if hashed in cache: 1913 found += 1 1914 continue 1915 account = row[0] 1916 desc = row[1] 1917 value = float(row[2]) 1918 rate = 1.0 1919 if row[4:5]: # Empty list if index is out of range 1920 rate = float(row[4]) 1921 date: int = 0 1922 for time_format in date_formats: 1923 try: 1924 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1925 break 1926 except: 1927 pass 1928 # TODO: not allowed for negative dates in the future after enhance time functions 1929 if date == 0: 1930 bad[i] = row + ['invalid date'] 1931 if value == 0: 1932 bad[i] = row + ['invalid value'] 1933 continue 1934 if date not in data: 1935 data[date] = [] 1936 data[date].append((i, account, desc, value, date, rate, hashed)) 1937 1938 if debug: 1939 print('import_csv', len(data)) 1940 1941 if bad: 1942 return created, found, bad 1943 1944 for date, rows in sorted(data.items()): 1945 try: 1946 len_rows = len(rows) 1947 if len_rows == 1: 1948 (_, account, desc, unscaled_value, date, rate, hashed) = rows[0] 1949 value = self.unscale(unscaled_value, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value 1950 if rate > 0: 1951 self.exchange(account=account, created=date, rate=rate) 1952 if value > 0: 1953 self.track(unscaled_value=value, desc=desc, account=account, logging=True, created=date) 1954 elif value < 0: 1955 self.sub(unscaled_value=-value, desc=desc, account=account, created=date) 1956 created += 1 1957 cache.append(hashed) 1958 continue 1959 if debug: 1960 print('-- Duplicated time detected', date, 'len', len_rows) 1961 print(rows) 1962 print('---------------------------------') 1963 # If records are found at the same time with different accounts in the same amount 1964 # (one positive and the other negative), this indicates it is a transfer. 1965 if len_rows != 2: 1966 raise Exception(f'more than two transactions({len_rows}) at the same time') 1967 (i, account1, desc1, unscaled_value1, date1, rate1, _) = rows[0] 1968 (j, account2, desc2, unscaled_value2, date2, rate2, _) = rows[1] 1969 if account1 == account2 or desc1 != desc2 or abs(unscaled_value1) != abs(unscaled_value2) or date1 != date2: 1970 raise Exception('invalid transfer') 1971 if rate1 > 0: 1972 self.exchange(account1, created=date1, rate=rate1) 1973 if rate2 > 0: 1974 self.exchange(account2, created=date2, rate=rate2) 1975 value1 = self.unscale(unscaled_value1, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value1 1976 value2 = self.unscale(unscaled_value2, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value2 1977 values = { 1978 value1: account1, 1979 value2: account2, 1980 } 1981 self.transfer( 1982 unscaled_amount=abs(value1), 1983 from_account=values[min(values.keys())], 1984 to_account=values[max(values.keys())], 1985 desc=desc1, 1986 created=date1, 1987 ) 1988 except Exception as e: 1989 for (i, account, desc, value, date, rate, _) in rows: 1990 bad[i] = (account, desc, value, date, rate, e) 1991 break 1992 with open(self.import_csv_cache_path(), 'w') as stream: 1993 stream.write(camel.dump(cache)) 1994 return created, found, bad
The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system.
Parameters: path (str): The path to the CSV file. Default is 'file.csv'. scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. debug (bool): A flag indicating whether to print debug information.
Returns: tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, and a dictionary of bad transactions.
Notes:
* Currency Pair Assumption: This function assumes that the exchange rates stored for each account
are appropriate for the currency pairs involved in the conversions.
* The exchange rate for each account is based on the last encountered transaction rate that is not equal
to 1.0 or the previous rate for that account.
* Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent
transactions of the same account within the whole imported and existing dataset when doing check
and
zakat
operations.
Example Usage: The CSV file should have the following format, rate is optional per transaction: account, desc, value, date, rate For example: safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1
2000 @staticmethod 2001 def human_readable_size(size: float, decimal_places: int = 2) -> str: 2002 """ 2003 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 2004 2005 This function iterates through progressively larger units of information 2006 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2007 range that can be expressed with a reasonable number before the unit. 2008 2009 Parameters: 2010 size (float): The size in bytes to convert. 2011 decimal_places (int, optional): The number of decimal places to display 2012 in the result. Defaults to 2. 2013 2014 Returns: 2015 str: A string representation of the size in a human-readable format, 2016 rounded to the specified number of decimal places. For example: 2017 - "1.50 KB" (1536 bytes) 2018 - "23.00 MB" (24117248 bytes) 2019 - "1.23 GB" (1325899906 bytes) 2020 """ 2021 if type(size) not in (float, int): 2022 raise TypeError("size must be a float or integer") 2023 if type(decimal_places) != int: 2024 raise TypeError("decimal_places must be an integer") 2025 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2026 if size < 1024.0: 2027 break 2028 size /= 1024.0 2029 return f"{size:.{decimal_places}f} {unit}"
Converts a size in bytes to a human-readable format (e.g., KB, MB, GB).
This function iterates through progressively larger units of information (B, KB, MB, GB, etc.) and divides the input size until it fits within a range that can be expressed with a reasonable number before the unit.
Parameters: size (float): The size in bytes to convert. decimal_places (int, optional): The number of decimal places to display in the result. Defaults to 2.
Returns: str: A string representation of the size in a human-readable format, rounded to the specified number of decimal places. For example: - "1.50 KB" (1536 bytes) - "23.00 MB" (24117248 bytes) - "1.23 GB" (1325899906 bytes)
2031 @staticmethod 2032 def get_dict_size(obj: dict, seen: set = None) -> float: 2033 """ 2034 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2035 2036 This function traverses the dictionary structure, accounting for the size of keys, values, 2037 and any nested objects. It handles various data types commonly found in dictionaries 2038 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2039 of circular references. 2040 2041 Parameters: 2042 obj (dict): The dictionary whose size is to be calculated. 2043 seen (set, optional): A set used internally to track visited objects 2044 and avoid circular references. Defaults to None. 2045 2046 Returns: 2047 float: An approximate size of the dictionary and its contents in bytes. 2048 2049 Note: 2050 - This function is a method of the `ZakatTracker` class and is likely used to 2051 estimate the memory footprint of data structures relevant to Zakat calculations. 2052 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2053 not account for all memory overhead depending on the Python implementation. 2054 - Circular references are handled to prevent infinite recursion. 2055 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2056 - String sizes are estimated based on character length and encoding. 2057 """ 2058 size = 0 2059 if seen is None: 2060 seen = set() 2061 2062 obj_id = id(obj) 2063 if obj_id in seen: 2064 return 0 2065 2066 seen.add(obj_id) 2067 size += sys.getsizeof(obj) 2068 2069 if isinstance(obj, dict): 2070 for k, v in obj.items(): 2071 size += ZakatTracker.get_dict_size(k, seen) 2072 size += ZakatTracker.get_dict_size(v, seen) 2073 elif isinstance(obj, (list, tuple, set, frozenset)): 2074 for item in obj: 2075 size += ZakatTracker.get_dict_size(item, seen) 2076 elif isinstance(obj, (int, float, complex)): # Handle numbers 2077 pass # Basic numbers have a fixed size, so nothing to add here 2078 elif isinstance(obj, str): # Handle strings 2079 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2080 return size
Recursively calculates the approximate memory size of a dictionary and its contents in bytes.
This function traverses the dictionary structure, accounting for the size of keys, values, and any nested objects. It handles various data types commonly found in dictionaries (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case of circular references.
Parameters: obj (dict): The dictionary whose size is to be calculated. seen (set, optional): A set used internally to track visited objects and avoid circular references. Defaults to None.
Returns: float: An approximate size of the dictionary and its contents in bytes.
Note:
- This function is a method of the
ZakatTracker
class and is likely used to estimate the memory footprint of data structures relevant to Zakat calculations. - The size calculation is approximate as it relies on
sys.getsizeof()
, which might not account for all memory overhead depending on the Python implementation. - Circular references are handled to prevent infinite recursion.
- Basic numeric types (int, float, complex) are assumed to have fixed sizes.
- String sizes are estimated based on character length and encoding.
2082 @staticmethod 2083 def duration_from_nanoseconds(ns: int, 2084 show_zeros_in_spoken_time: bool = False, 2085 spoken_time_separator=',', 2086 millennia: str = 'Millennia', 2087 century: str = 'Century', 2088 years: str = 'Years', 2089 days: str = 'Days', 2090 hours: str = 'Hours', 2091 minutes: str = 'Minutes', 2092 seconds: str = 'Seconds', 2093 milli_seconds: str = 'MilliSeconds', 2094 micro_seconds: str = 'MicroSeconds', 2095 nano_seconds: str = 'NanoSeconds', 2096 ) -> tuple: 2097 """ 2098 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2099 Convert NanoSeconds to Human Readable Time Format. 2100 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2101 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2102 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2103 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2104 2105 INPUT : ms (AKA: MilliSeconds) 2106 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2107 OUTPUT Variables: time_lapsed, spoken_time 2108 2109 Example Input: duration_from_nanoseconds(ns) 2110 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2111 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2112 duration_from_nanoseconds(1234567890123456789012) 2113 """ 2114 us, ns = divmod(ns, 1000) 2115 ms, us = divmod(us, 1000) 2116 s, ms = divmod(ms, 1000) 2117 m, s = divmod(s, 60) 2118 h, m = divmod(m, 60) 2119 d, h = divmod(h, 24) 2120 y, d = divmod(d, 365) 2121 c, y = divmod(y, 100) 2122 n, c = divmod(c, 10) 2123 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2124 spoken_time_part = [] 2125 if n > 0 or show_zeros_in_spoken_time: 2126 spoken_time_part.append(f"{n: 3d} {millennia}") 2127 if c > 0 or show_zeros_in_spoken_time: 2128 spoken_time_part.append(f"{c: 4d} {century}") 2129 if y > 0 or show_zeros_in_spoken_time: 2130 spoken_time_part.append(f"{y: 3d} {years}") 2131 if d > 0 or show_zeros_in_spoken_time: 2132 spoken_time_part.append(f"{d: 4d} {days}") 2133 if h > 0 or show_zeros_in_spoken_time: 2134 spoken_time_part.append(f"{h: 2d} {hours}") 2135 if m > 0 or show_zeros_in_spoken_time: 2136 spoken_time_part.append(f"{m: 2d} {minutes}") 2137 if s > 0 or show_zeros_in_spoken_time: 2138 spoken_time_part.append(f"{s: 2d} {seconds}") 2139 if ms > 0 or show_zeros_in_spoken_time: 2140 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2141 if us > 0 or show_zeros_in_spoken_time: 2142 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2143 if ns > 0 or show_zeros_in_spoken_time: 2144 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2145 return time_lapsed, spoken_time_separator.join(spoken_time_part)
REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 Convert NanoSeconds to Human Readable Time Format. A NanoSeconds is a unit of time in the International System of Units (SI) equal to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. Its symbol is μs, sometimes simplified to us when Unicode is not available. A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond.
INPUT : ms (AKA: MilliSeconds) OUTPUT: tuple(string time_lapsed, string spoken_time) like format. OUTPUT Variables: time_lapsed, spoken_time
Example Input: duration_from_nanoseconds(ns) "Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds" Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') duration_from_nanoseconds(1234567890123456789012)
2147 @staticmethod 2148 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2149 """ 2150 Convert a specific day, month, and year into a timestamp. 2151 2152 Parameters: 2153 day (int): The day of the month. 2154 month (int): The month of the year. Default is 6 (June). 2155 year (int): The year. Default is 2024. 2156 2157 Returns: 2158 int: The timestamp representing the given day, month, and year. 2159 2160 Note: 2161 This method assumes the default month and year if not provided. 2162 """ 2163 return ZakatTracker.time(datetime.datetime(year, month, day))
Convert a specific day, month, and year into a timestamp.
Parameters: day (int): The day of the month. month (int): The month of the year. Default is 6 (June). year (int): The year. Default is 2024.
Returns: int: The timestamp representing the given day, month, and year.
Note: This method assumes the default month and year if not provided.
2165 @staticmethod 2166 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2167 """ 2168 Generate a random date between two given dates. 2169 2170 Parameters: 2171 start_date (datetime.datetime): The start date from which to generate a random date. 2172 end_date (datetime.datetime): The end date until which to generate a random date. 2173 2174 Returns: 2175 datetime.datetime: A random date between the start_date and end_date. 2176 """ 2177 time_between_dates = end_date - start_date 2178 days_between_dates = time_between_dates.days 2179 random_number_of_days = random.randrange(days_between_dates) 2180 return start_date + datetime.timedelta(days=random_number_of_days)
Generate a random date between two given dates.
Parameters: start_date (datetime.datetime): The start date from which to generate a random date. end_date (datetime.datetime): The end date until which to generate a random date.
Returns: datetime.datetime: A random date between the start_date and end_date.
2182 @staticmethod 2183 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2184 debug: bool = False) -> int: 2185 """ 2186 Generate a random CSV file with specified parameters. 2187 2188 Parameters: 2189 path (str): The path where the CSV file will be saved. Default is "data.csv". 2190 count (int): The number of rows to generate in the CSV file. Default is 1000. 2191 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2192 debug (bool): A flag indicating whether to print debug information. 2193 2194 Returns: 2195 None. The function generates a CSV file at the specified path with the given count of rows. 2196 Each row contains a randomly generated account, description, value, and date. 2197 The value is randomly generated between 1000 and 100000, 2198 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2199 If the row number is not divisible by 13, the value is multiplied by -1. 2200 """ 2201 if debug: 2202 print('generate_random_csv_file', f'debug={debug}') 2203 i = 0 2204 with open(path, "w", newline="") as csvfile: 2205 writer = csv.writer(csvfile) 2206 for i in range(count): 2207 account = f"acc-{random.randint(1, 1000)}" 2208 desc = f"Some text {random.randint(1, 1000)}" 2209 value = random.randint(1000, 100000) 2210 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2211 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2212 if not i % 13 == 0: 2213 value *= -1 2214 row = [account, desc, value, date] 2215 if with_rate: 2216 rate = random.randint(1, 100) * 0.12 2217 if debug: 2218 print('before-append', row) 2219 row.append(rate) 2220 if debug: 2221 print('after-append', row) 2222 writer.writerow(row) 2223 i = i + 1 2224 return i
Generate a random CSV file with specified parameters.
Parameters: path (str): The path where the CSV file will be saved. Default is "data.csv". count (int): The number of rows to generate in the CSV file. Default is 1000. with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. debug (bool): A flag indicating whether to print debug information.
Returns: None. The function generates a CSV file at the specified path with the given count of rows. Each row contains a randomly generated account, description, value, and date. The value is randomly generated between 1000 and 100000, and the date is randomly generated between 1950-01-01 and 2023-12-31. If the row number is not divisible by 13, the value is multiplied by -1.
2226 @staticmethod 2227 def create_random_list(max_sum, min_value=0, max_value=10): 2228 """ 2229 Creates a list of random integers whose sum does not exceed the specified maximum. 2230 2231 Args: 2232 max_sum: The maximum allowed sum of the list elements. 2233 min_value: The minimum possible value for an element (inclusive). 2234 max_value: The maximum possible value for an element (inclusive). 2235 2236 Returns: 2237 A list of random integers. 2238 """ 2239 result = [] 2240 current_sum = 0 2241 2242 while current_sum < max_sum: 2243 # Calculate the remaining space for the next element 2244 remaining_sum = max_sum - current_sum 2245 # Determine the maximum possible value for the next element 2246 next_max_value = min(remaining_sum, max_value) 2247 # Generate a random element within the allowed range 2248 next_element = random.randint(min_value, next_max_value) 2249 result.append(next_element) 2250 current_sum += next_element 2251 2252 return result
Creates a list of random integers whose sum does not exceed the specified maximum.
Args: max_sum: The maximum allowed sum of the list elements. min_value: The minimum possible value for an element (inclusive). max_value: The maximum possible value for an element (inclusive).
Returns: A list of random integers.
2483 def test(self, debug: bool = False) -> bool: 2484 if debug: 2485 print('test', f'debug={debug}') 2486 try: 2487 2488 self._test_core(True, debug) 2489 self._test_core(False, debug) 2490 2491 assert self._history() 2492 2493 # Not allowed for duplicate transactions in the same account and time 2494 2495 created = ZakatTracker.time() 2496 self.track(100, 'test-1', 'same', True, created) 2497 failed = False 2498 try: 2499 self.track(50, 'test-1', 'same', True, created) 2500 except: 2501 failed = True 2502 assert failed is True 2503 2504 self.reset() 2505 2506 # Same account transfer 2507 for x in [1, 'a', True, 1.8, None]: 2508 failed = False 2509 try: 2510 self.transfer(1, x, x, 'same-account', debug=debug) 2511 except: 2512 failed = True 2513 assert failed is True 2514 2515 # Always preserve box age during transfer 2516 2517 series: list[tuple] = [ 2518 (30, 4), 2519 (60, 3), 2520 (90, 2), 2521 ] 2522 case = { 2523 3000: { 2524 'series': series, 2525 'rest': 15000, 2526 }, 2527 6000: { 2528 'series': series, 2529 'rest': 12000, 2530 }, 2531 9000: { 2532 'series': series, 2533 'rest': 9000, 2534 }, 2535 18000: { 2536 'series': series, 2537 'rest': 0, 2538 }, 2539 27000: { 2540 'series': series, 2541 'rest': -9000, 2542 }, 2543 36000: { 2544 'series': series, 2545 'rest': -18000, 2546 }, 2547 } 2548 2549 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2550 2551 for total in case: 2552 if debug: 2553 print('--------------------------------------------------------') 2554 print(f'case[{total}]', case[total]) 2555 for x in case[total]['series']: 2556 self.track( 2557 unscaled_value=x[0], 2558 desc=f"test-{x} ages", 2559 account='ages', 2560 logging=True, 2561 created=selected_time * x[1], 2562 ) 2563 2564 unscaled_total = self.unscale(total) 2565 if debug: 2566 print('unscaled_total', unscaled_total) 2567 refs = self.transfer( 2568 unscaled_amount=unscaled_total, 2569 from_account='ages', 2570 to_account='future', 2571 desc='Zakat Movement', 2572 debug=debug, 2573 ) 2574 2575 if debug: 2576 print('refs', refs) 2577 2578 ages_cache_balance = self.balance('ages') 2579 ages_fresh_balance = self.balance('ages', False) 2580 rest = case[total]['rest'] 2581 if debug: 2582 print('source', ages_cache_balance, ages_fresh_balance, rest) 2583 assert ages_cache_balance == rest 2584 assert ages_fresh_balance == rest 2585 2586 future_cache_balance = self.balance('future') 2587 future_fresh_balance = self.balance('future', False) 2588 if debug: 2589 print('target', future_cache_balance, future_fresh_balance, total) 2590 print('refs', refs) 2591 assert future_cache_balance == total 2592 assert future_fresh_balance == total 2593 2594 # TODO: check boxes times for `ages` should equal box times in `future` 2595 for ref in self._vault['account']['ages']['box']: 2596 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2597 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2598 future_capital = 0 2599 if ref in self._vault['account']['future']['box']: 2600 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2601 future_rest = 0 2602 if ref in self._vault['account']['future']['box']: 2603 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2604 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2605 if debug: 2606 print('================================================================') 2607 print('ages', ages_capital, ages_rest) 2608 print('future', future_capital, future_rest) 2609 if ages_rest == 0: 2610 assert ages_capital == future_capital 2611 elif ages_rest < 0: 2612 assert -ages_capital == future_capital 2613 elif ages_rest > 0: 2614 assert ages_capital == ages_rest + future_capital 2615 self.reset() 2616 assert len(self._vault['history']) == 0 2617 2618 assert self._history() 2619 assert self._history(False) is False 2620 assert self._history() is False 2621 assert self._history(True) 2622 assert self._history() 2623 if debug: 2624 print('####################################################################') 2625 2626 transaction = [ 2627 ( 2628 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2629 2000, 2000, 2000, 1, 1, 2630 ), 2631 ( 2632 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2633 75000, 75000, 75000, 1, 1, 2634 ), 2635 ( 2636 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2637 60000, 60000, 60000, 1, 1, 2638 ), 2639 ] 2640 for z in transaction: 2641 self.lock() 2642 x = z[1] 2643 y = z[2] 2644 self.transfer( 2645 unscaled_amount=z[0], 2646 from_account=x, 2647 to_account=y, 2648 desc='test-transfer', 2649 debug=debug, 2650 ) 2651 zz = self.balance(x) 2652 if debug: 2653 print(zz, z) 2654 assert zz == z[3] 2655 xx = self.accounts()[x] 2656 assert xx == z[3] 2657 assert self.balance(x, False) == z[4] 2658 assert xx == z[4] 2659 2660 s = 0 2661 log = self._vault['account'][x]['log'] 2662 for i in log: 2663 s += log[i]['value'] 2664 if debug: 2665 print('s', s, 'z[5]', z[5]) 2666 assert s == z[5] 2667 2668 assert self.box_size(x) == z[6] 2669 assert self.log_size(x) == z[7] 2670 2671 yy = self.accounts()[y] 2672 assert self.balance(y) == z[8] 2673 assert yy == z[8] 2674 assert self.balance(y, False) == z[9] 2675 assert yy == z[9] 2676 2677 s = 0 2678 log = self._vault['account'][y]['log'] 2679 for i in log: 2680 s += log[i]['value'] 2681 assert s == z[10] 2682 2683 assert self.box_size(y) == z[11] 2684 assert self.log_size(y) == z[12] 2685 assert self.free(self.lock()) 2686 2687 if debug: 2688 pp().pprint(self.check(2.17)) 2689 2690 assert not self.nolock() 2691 history_count = len(self._vault['history']) 2692 if debug: 2693 print('history-count', history_count) 2694 assert history_count == 4 2695 assert not self.free(ZakatTracker.time()) 2696 assert self.free(self.lock()) 2697 assert self.nolock() 2698 assert len(self._vault['history']) == 3 2699 2700 # storage 2701 2702 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 2703 if os.path.exists(_path): 2704 os.remove(_path) 2705 self.save() 2706 assert os.path.getsize(_path) > 0 2707 self.reset() 2708 assert self.recall(False, debug) is False 2709 self.load() 2710 assert self._vault['account'] is not None 2711 2712 # recall 2713 2714 assert self.nolock() 2715 assert len(self._vault['history']) == 3 2716 assert self.recall(False, debug) is True 2717 assert len(self._vault['history']) == 2 2718 assert self.recall(False, debug) is True 2719 assert len(self._vault['history']) == 1 2720 assert self.recall(False, debug) is True 2721 assert len(self._vault['history']) == 0 2722 assert self.recall(False, debug) is False 2723 assert len(self._vault['history']) == 0 2724 2725 # exchange 2726 2727 self.exchange("cash", 25, 3.75, "2024-06-25") 2728 self.exchange("cash", 22, 3.73, "2024-06-22") 2729 self.exchange("cash", 15, 3.69, "2024-06-15") 2730 self.exchange("cash", 10, 3.66) 2731 2732 for i in range(1, 30): 2733 exchange = self.exchange("cash", i) 2734 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2735 if debug: 2736 print(i, rate, description, created) 2737 assert created 2738 if i < 10: 2739 assert rate == 1 2740 assert description is None 2741 elif i == 10: 2742 assert rate == 3.66 2743 assert description is None 2744 elif i < 15: 2745 assert rate == 3.66 2746 assert description is None 2747 elif i == 15: 2748 assert rate == 3.69 2749 assert description is not None 2750 elif i < 22: 2751 assert rate == 3.69 2752 assert description is not None 2753 elif i == 22: 2754 assert rate == 3.73 2755 assert description is not None 2756 elif i >= 25: 2757 assert rate == 3.75 2758 assert description is not None 2759 exchange = self.exchange("bank", i) 2760 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2761 if debug: 2762 print(i, rate, description, created) 2763 assert created 2764 assert rate == 1 2765 assert description is None 2766 2767 assert len(self._vault['exchange']) > 0 2768 assert len(self.exchanges()) > 0 2769 self._vault['exchange'].clear() 2770 assert len(self._vault['exchange']) == 0 2771 assert len(self.exchanges()) == 0 2772 2773 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2774 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2775 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2776 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2777 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2778 2779 for i in [x * 0.12 for x in range(-15, 21)]: 2780 if i <= 0: 2781 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2782 else: 2783 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2784 2785 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2786 for i in range(1, 31): 2787 timestamp_ns = ZakatTracker.day_to_time(i) 2788 exchange = self.exchange("cash", timestamp_ns) 2789 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2790 if debug: 2791 print(i, rate, description, created) 2792 assert created 2793 if i < 10: 2794 assert rate == 1 2795 assert description is None 2796 elif i == 10: 2797 assert rate == 3.66 2798 assert description is None 2799 elif i < 15: 2800 assert rate == 3.66 2801 assert description is None 2802 elif i == 15: 2803 assert rate == 3.69 2804 assert description is not None 2805 elif i < 22: 2806 assert rate == 3.69 2807 assert description is not None 2808 elif i == 22: 2809 assert rate == 3.73 2810 assert description is not None 2811 elif i >= 25: 2812 assert rate == 3.75 2813 assert description is not None 2814 exchange = self.exchange("bank", i) 2815 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2816 if debug: 2817 print(i, rate, description, created) 2818 assert created 2819 assert rate == 1 2820 assert description is None 2821 2822 # csv 2823 2824 csv_count = 1000 2825 2826 for with_rate, path in { 2827 False: 'test-import_csv-no-exchange', 2828 True: 'test-import_csv-with-exchange', 2829 }.items(): 2830 2831 if debug: 2832 print('test_import_csv', with_rate, path) 2833 2834 csv_path = path + '.csv' 2835 if os.path.exists(csv_path): 2836 os.remove(csv_path) 2837 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2838 if debug: 2839 print('generate_random_csv_file', c) 2840 assert c == csv_count 2841 assert os.path.getsize(csv_path) > 0 2842 cache_path = self.import_csv_cache_path() 2843 if os.path.exists(cache_path): 2844 os.remove(cache_path) 2845 self.reset() 2846 (created, found, bad) = self.import_csv(csv_path, debug) 2847 bad_count = len(bad) 2848 assert bad_count > 0 2849 if debug: 2850 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2851 print('bad', bad) 2852 tmp_size = os.path.getsize(cache_path) 2853 assert tmp_size > 0 2854 # TODO: assert created + found + bad_count == csv_count 2855 # TODO: assert created == csv_count 2856 # TODO: assert bad_count == 0 2857 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2858 bad_2_count = len(bad_2) 2859 if debug: 2860 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2861 print('bad', bad) 2862 assert bad_2_count > 0 2863 # TODO: assert tmp_size == os.path.getsize(cache_path) 2864 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2865 # TODO: assert created == found_2 2866 # TODO: assert bad_count == bad_2_count 2867 # TODO: assert found_2 == csv_count 2868 # TODO: assert bad_2_count == 0 2869 # TODO: assert created_2 == 0 2870 2871 # payment parts 2872 2873 positive_parts = self.build_payment_parts(100, positive_only=True) 2874 assert self.check_payment_parts(positive_parts) != 0 2875 assert self.check_payment_parts(positive_parts) != 0 2876 all_parts = self.build_payment_parts(300, positive_only=False) 2877 assert self.check_payment_parts(all_parts) != 0 2878 assert self.check_payment_parts(all_parts) != 0 2879 if debug: 2880 pp().pprint(positive_parts) 2881 pp().pprint(all_parts) 2882 # dynamic discount 2883 suite = [] 2884 count = 3 2885 for exceed in [False, True]: 2886 case = [] 2887 for parts in [positive_parts, all_parts]: 2888 part = parts.copy() 2889 demand = part['demand'] 2890 if debug: 2891 print(demand, part['total']) 2892 i = 0 2893 z = demand / count 2894 cp = { 2895 'account': {}, 2896 'demand': demand, 2897 'exceed': exceed, 2898 'total': part['total'], 2899 } 2900 j = '' 2901 for x, y in part['account'].items(): 2902 x_exchange = self.exchange(x) 2903 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2904 if exceed and zz <= demand: 2905 i += 1 2906 y['part'] = zz 2907 if debug: 2908 print(exceed, y) 2909 cp['account'][x] = y 2910 case.append(y) 2911 elif not exceed and y['balance'] >= zz: 2912 i += 1 2913 y['part'] = zz 2914 if debug: 2915 print(exceed, y) 2916 cp['account'][x] = y 2917 case.append(y) 2918 j = x 2919 if i >= count: 2920 break 2921 if len(cp['account'][j]) > 0: 2922 suite.append(cp) 2923 if debug: 2924 print('suite', len(suite)) 2925 # vault = self._vault.copy() 2926 for case in suite: 2927 # self._vault = vault.copy() 2928 if debug: 2929 print('case', case) 2930 result = self.check_payment_parts(case) 2931 if debug: 2932 print('check_payment_parts', result, f'exceed: {exceed}') 2933 assert result == 0 2934 2935 report = self.check(2.17, None, debug) 2936 (valid, brief, plan) = report 2937 if debug: 2938 print('valid', valid) 2939 zakat_result = self.zakat(report, parts=case, debug=debug) 2940 if debug: 2941 print('zakat-result', zakat_result) 2942 assert valid == zakat_result 2943 2944 assert self.save(path + f'.{self.ext()}') 2945 assert self.export_json(path + '.json') 2946 2947 assert self.export_json("1000-transactions-test.json") 2948 assert self.save(f"1000-transactions-test.{self.ext()}") 2949 2950 self.reset() 2951 2952 # test transfer between accounts with different exchange rate 2953 2954 a_SAR = "Bank (SAR)" 2955 b_USD = "Bank (USD)" 2956 c_SAR = "Safe (SAR)" 2957 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2958 for case in [ 2959 (0, a_SAR, "SAR Gift", 1000, 100000), 2960 (1, a_SAR, 1), 2961 (0, b_USD, "USD Gift", 500, 50000), 2962 (1, b_USD, 1), 2963 (2, b_USD, 3.75), 2964 (1, b_USD, 3.75), 2965 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2966 (0, c_SAR, "Salary", 750, 75000), 2967 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2968 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2969 ]: 2970 if debug: 2971 print('case', case) 2972 match (case[0]): 2973 case 0: # track 2974 _, account, desc, x, balance = case 2975 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2976 2977 cached_value = self.balance(account, cached=True) 2978 fresh_value = self.balance(account, cached=False) 2979 if debug: 2980 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2981 assert cached_value == balance 2982 assert fresh_value == balance 2983 case 1: # check-exchange 2984 _, account, expected_rate = case 2985 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2986 if debug: 2987 print('t-exchange', t_exchange) 2988 assert t_exchange['rate'] == expected_rate 2989 case 2: # do-exchange 2990 _, account, rate = case 2991 self.exchange(account, rate=rate, debug=debug) 2992 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2993 if debug: 2994 print('b-exchange', b_exchange) 2995 assert b_exchange['rate'] == rate 2996 case 3: # transfer 2997 _, x, a, b, desc, a_balance, b_balance = case 2998 self.transfer(x, a, b, desc, debug=debug) 2999 3000 cached_value = self.balance(a, cached=True) 3001 fresh_value = self.balance(a, cached=False) 3002 if debug: 3003 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 3004 assert cached_value == a_balance 3005 assert fresh_value == a_balance 3006 3007 cached_value = self.balance(b, cached=True) 3008 fresh_value = self.balance(b, cached=False) 3009 if debug: 3010 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3011 assert cached_value == b_balance 3012 assert fresh_value == b_balance 3013 3014 # Transfer all in many chunks randomly from B to A 3015 a_SAR_balance = 137125 3016 b_USD_balance = 50100 3017 b_USD_exchange = self.exchange(b_USD) 3018 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3019 if debug: 3020 print('amounts', amounts) 3021 i = 0 3022 for x in amounts: 3023 if debug: 3024 print(f'{i} - transfer-with-exchange({x})') 3025 self.transfer( 3026 unscaled_amount=self.unscale(x), 3027 from_account=b_USD, 3028 to_account=a_SAR, 3029 desc=f"{x} USD -> SAR", 3030 debug=debug, 3031 ) 3032 3033 b_USD_balance -= x 3034 cached_value = self.balance(b_USD, cached=True) 3035 fresh_value = self.balance(b_USD, cached=False) 3036 if debug: 3037 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3038 b_USD_balance) 3039 assert cached_value == b_USD_balance 3040 assert fresh_value == b_USD_balance 3041 3042 a_SAR_balance += int(x * b_USD_exchange['rate']) 3043 cached_value = self.balance(a_SAR, cached=True) 3044 fresh_value = self.balance(a_SAR, cached=False) 3045 if debug: 3046 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3047 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3048 assert cached_value == a_SAR_balance 3049 assert fresh_value == a_SAR_balance 3050 i += 1 3051 3052 # Transfer all in many chunks randomly from C to A 3053 c_SAR_balance = 37500 3054 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3055 if debug: 3056 print('amounts', amounts) 3057 i = 0 3058 for x in amounts: 3059 if debug: 3060 print(f'{i} - transfer-with-exchange({x})') 3061 self.transfer( 3062 unscaled_amount=self.unscale(x), 3063 from_account=c_SAR, 3064 to_account=a_SAR, 3065 desc=f"{x} SAR -> a_SAR", 3066 debug=debug, 3067 ) 3068 3069 c_SAR_balance -= x 3070 cached_value = self.balance(c_SAR, cached=True) 3071 fresh_value = self.balance(c_SAR, cached=False) 3072 if debug: 3073 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3074 c_SAR_balance) 3075 assert cached_value == c_SAR_balance 3076 assert fresh_value == c_SAR_balance 3077 3078 a_SAR_balance += x 3079 cached_value = self.balance(a_SAR, cached=True) 3080 fresh_value = self.balance(a_SAR, cached=False) 3081 if debug: 3082 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3083 a_SAR_balance) 3084 assert cached_value == a_SAR_balance 3085 assert fresh_value == a_SAR_balance 3086 i += 1 3087 3088 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3089 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3090 3091 # check & zakat with exchange rates for many cycles 3092 3093 for rate, values in { 3094 1: { 3095 'in': [1000, 2000, 10000], 3096 'exchanged': [100000, 200000, 1000000], 3097 'out': [2500, 5000, 73140], 3098 }, 3099 3.75: { 3100 'in': [200, 1000, 5000], 3101 'exchanged': [75000, 375000, 1875000], 3102 'out': [1875, 9375, 137138], 3103 }, 3104 }.items(): 3105 a, b, c = values['in'] 3106 m, n, o = values['exchanged'] 3107 x, y, z = values['out'] 3108 if debug: 3109 print('rate', rate, 'values', values) 3110 for case in [ 3111 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3112 {'safe': {0: {'below_nisab': x}}}, 3113 ], False, m), 3114 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3115 {'safe': {0: {'count': 1, 'total': y}}}, 3116 ], True, n), 3117 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3118 {'cave': {0: {'count': 3, 'total': z}}}, 3119 ], True, o), 3120 ]: 3121 if debug: 3122 print(f"############# check(rate: {rate}) #############") 3123 print('case', case) 3124 self.reset() 3125 self.exchange(account=case[1], created=case[2], rate=rate) 3126 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3127 assert self.snapshot() 3128 3129 # assert self.nolock() 3130 # history_size = len(self._vault['history']) 3131 # print('history_size', history_size) 3132 # assert history_size == 2 3133 assert self.lock() 3134 assert not self.nolock() 3135 report = self.check(2.17, None, debug) 3136 (valid, brief, plan) = report 3137 if debug: 3138 print('brief', brief) 3139 assert valid == case[4] 3140 assert case[5] == brief[0] 3141 assert case[5] == brief[1] 3142 3143 if debug: 3144 pp().pprint(plan) 3145 3146 for x in plan: 3147 assert case[1] == x 3148 if 'total' in case[3][0][x][0].keys(): 3149 assert case[3][0][x][0]['total'] == int(brief[2]) 3150 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3151 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3152 else: 3153 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3154 if debug: 3155 pp().pprint(report) 3156 result = self.zakat(report, debug=debug) 3157 if debug: 3158 print('zakat-result', result, case[4]) 3159 assert result == case[4] 3160 report = self.check(2.17, None, debug) 3161 (valid, brief, plan) = report 3162 assert valid is False 3163 3164 history_size = len(self._vault['history']) 3165 if debug: 3166 print('history_size', history_size) 3167 assert history_size == 3 3168 assert not self.nolock() 3169 assert self.recall(False, debug) is False 3170 self.free(self.lock()) 3171 assert self.nolock() 3172 3173 for i in range(3, 0, -1): 3174 history_size = len(self._vault['history']) 3175 if debug: 3176 print('history_size', history_size) 3177 assert history_size == i 3178 assert self.recall(False, debug) is True 3179 3180 assert self.nolock() 3181 assert self.recall(False, debug) is False 3182 3183 history_size = len(self._vault['history']) 3184 if debug: 3185 print('history_size', history_size) 3186 assert history_size == 0 3187 3188 account_size = len(self._vault['account']) 3189 if debug: 3190 print('account_size', account_size) 3191 assert account_size == 0 3192 3193 report_size = len(self._vault['report']) 3194 if debug: 3195 print('report_size', report_size) 3196 assert report_size == 0 3197 3198 assert self.nolock() 3199 return True 3200 except Exception as e: 3201 # pp().pprint(self._vault) 3202 assert self.export_json("test-snapshot.json") 3203 assert self.save(f"test-snapshot.{self.ext()}") 3204 raise e
3207def test(debug: bool = False): 3208 ledger = ZakatTracker("./zakat_test_db/zakat.camel") 3209 start = ZakatTracker.time() 3210 assert ledger.test(debug=debug) 3211 if debug: 3212 print("#########################") 3213 print("######## TEST DONE ########") 3214 print("#########################") 3215 print(ZakatTracker.duration_from_nanoseconds(ZakatTracker.time() - start)) 3216 print("#########################")
76class Action(Enum): 77 CREATE = auto() 78 TRACK = auto() 79 LOG = auto() 80 SUB = auto() 81 ADD_FILE = auto() 82 REMOVE_FILE = auto() 83 BOX_TRANSFER = auto() 84 EXCHANGE = auto() 85 REPORT = auto() 86 ZAKAT = auto()
89class JSONEncoder(json.JSONEncoder): 90 def default(self, obj): 91 if isinstance(obj, Action) or isinstance(obj, MathOperation): 92 return obj.name # Serialize as the enum member's name 93 elif isinstance(obj, Decimal): 94 return float(obj) 95 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
90 def default(self, obj): 91 if isinstance(obj, Action) or isinstance(obj, MathOperation): 92 return obj.name # Serialize as the enum member's name 93 elif isinstance(obj, Decimal): 94 return float(obj) 95 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
55def start_file_server(database_path: str, database_callback: callable = None, csv_callback: callable = None, 56 debug: bool = False) -> tuple: 57 """ 58 Starts a multi-purpose HTTP server to manage file interactions for a Zakat application. 59 60 This server facilitates the following functionalities: 61 62 1. GET /{file_uuid}/get: Download the database file specified by `database_path`. 63 2. GET /{file_uuid}/upload: Display an HTML form for uploading files. 64 3. POST /{file_uuid}/upload: Handle file uploads, distinguishing between: 65 - Database File (.db): Replaces the existing database with the uploaded one. 66 - CSV File (.csv): Imports data from the CSV into the existing database. 67 68 Args: 69 database_path (str): The path to the pickle database file. 70 database_callback (callable, optional): A function to call after a successful database upload. 71 It receives the uploaded database path as its argument. 72 csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, 73 the database path, and the debug flag as its arguments. 74 debug (bool, optional): If True, print debugging information. Defaults to False. 75 76 Returns: 77 Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: 78 - file_name (str): The name of the database file. 79 - download_url (str): The URL to download the database file. 80 - upload_url (str): The URL to access the file upload form. 81 - server_thread (threading.Thread): The thread running the server. 82 - shutdown_server (Callable[[], None]): A function to gracefully shut down the server. 83 84 Example: 85 _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") 86 print(f"Download database: {download_url}") 87 print(f"Upload files: {upload_url}") 88 server_thread.start() 89 # ... later ... 90 shutdown_server() 91 """ 92 file_uuid = uuid.uuid4() 93 file_name = os.path.basename(database_path) 94 95 port = find_available_port() 96 download_url = f"http://localhost:{port}/{file_uuid}/get" 97 upload_url = f"http://localhost:{port}/{file_uuid}/upload" 98 99 class Handler(http.server.SimpleHTTPRequestHandler): 100 def do_GET(self): 101 if self.path == f"/{file_uuid}/get": 102 # GET: Serve the existing file 103 try: 104 with open(database_path, "rb") as f: 105 self.send_response(200) 106 self.send_header("Content-type", "application/octet-stream") 107 self.send_header("Content-Disposition", f'attachment; filename="{file_name}"') 108 self.end_headers() 109 self.wfile.write(f.read()) 110 except FileNotFoundError: 111 self.send_error(404, "File not found") 112 elif self.path == f"/{file_uuid}/upload": 113 # GET: Serve the upload form 114 self.send_response(200) 115 self.send_header("Content-type", "text/html") 116 self.end_headers() 117 self.wfile.write(f""" 118 <html lang="en"> 119 <head> 120 <title>Zakat File Server</title> 121 </head> 122 <body> 123 <h1>Zakat File Server</h1> 124 <h3>You can download the <a target="__blank" href="{download_url}">database file</a>...</h3> 125 <h3>Or upload a new file to restore a database or import `CSV` file:</h3> 126 <form action="/{file_uuid}/upload" method="post" enctype="multipart/form-data"> 127 <input type="file" name="file" required><br/> 128 <input type="radio" id="{FileType.Database.value}" name="upload_type" value="{FileType.Database.value}" required> 129 <label for="database">Database File</label><br/> 130 <input type="radio"id="{FileType.CSV.value}" name="upload_type" value="{FileType.CSV.value}"> 131 <label for="csv">CSV File</label><br/> 132 <input type="submit" value="Upload"><br/> 133 </form> 134 </body></html> 135 """.encode()) 136 else: 137 self.send_error(404) 138 139 def do_POST(self): 140 if self.path == f"/{file_uuid}/upload": 141 # POST: Handle request 142 # 1. Get the Form Data 143 form_data = cgi.FieldStorage( 144 fp=self.rfile, 145 headers=self.headers, 146 environ={'REQUEST_METHOD': 'POST'} 147 ) 148 upload_type = form_data.getvalue("upload_type") 149 150 if debug: 151 print('upload_type', upload_type) 152 153 if upload_type not in [FileType.Database.value, FileType.CSV.value]: 154 self.send_error(400, "Invalid upload type") 155 return 156 157 # 2. Extract File Data 158 file_item = form_data['file'] # Assuming 'file' is your file input name 159 160 # 3. Get File Details 161 filename = file_item.filename 162 file_data = file_item.file.read() # Read the file's content 163 164 if debug: 165 print(f'Uploaded filename: {filename}') 166 167 # 4. Define Storage Path for CSV 168 upload_directory = "./uploads" # Create this directory if it doesn't exist 169 os.makedirs(upload_directory, exist_ok=True) 170 file_path = os.path.join(upload_directory, upload_type) 171 172 # 5. Write to Disk 173 with open(file_path, 'wb') as f: 174 f.write(file_data) 175 176 match upload_type: 177 case FileType.Database.value: 178 179 try: 180 # 6. Verify database file 181 # ZakatTracker(db_path=file_path) # FATAL, Circular Imports Error 182 if database_callback is not None: 183 database_callback(file_path) 184 185 # 7. Copy database into the original path 186 shutil.copy2(file_path, database_path) 187 except Exception as e: 188 self.send_error(400, str(e)) 189 return 190 191 case FileType.CSV.value: 192 # 6. Verify CSV file 193 try: 194 # x = ZakatTracker(db_path=database_path) # FATAL, Circular Imports Error 195 # result = x.import_csv(file_path, debug=debug) 196 if csv_callback is not None: 197 result = csv_callback(file_path, database_path, debug) 198 if debug: 199 print(f'CSV imported: {result}') 200 if len(result[2]) != 0: 201 self.send_response(200) 202 self.end_headers() 203 self.wfile.write(json.dumps(result).encode()) 204 return 205 except Exception as e: 206 self.send_error(400, str(e)) 207 return 208 209 self.send_response(200) 210 self.end_headers() 211 self.wfile.write(b"File uploaded successfully.") 212 213 httpd = socketserver.TCPServer(("localhost", port), Handler) 214 server_thread = threading.Thread(target=httpd.serve_forever) 215 216 def shutdown_server(): 217 nonlocal httpd, server_thread 218 httpd.shutdown() 219 httpd.server_close() # Close the socket 220 server_thread.join() # Wait for the thread to finish 221 222 return file_name, download_url, upload_url, server_thread, shutdown_server
Starts a multi-purpose HTTP server to manage file interactions for a Zakat application.
This server facilitates the following functionalities:
- GET /{file_uuid}/get: Download the database file specified by
database_path
. - GET /{file_uuid}/upload: Display an HTML form for uploading files.
- POST /{file_uuid}/upload: Handle file uploads, distinguishing between:
- Database File (.db): Replaces the existing database with the uploaded one.
- CSV File (.csv): Imports data from the CSV into the existing database.
Args: database_path (str): The path to the pickle database file. database_callback (callable, optional): A function to call after a successful database upload. It receives the uploaded database path as its argument. csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, the database path, and the debug flag as its arguments. debug (bool, optional): If True, print debugging information. Defaults to False.
Returns: Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: - file_name (str): The name of the database file. - download_url (str): The URL to download the database file. - upload_url (str): The URL to access the file upload form. - server_thread (threading.Thread): The thread running the server. - shutdown_server (Callable[[], None]): A function to gracefully shut down the server.
Example: _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") print(f"Download database: {download_url}") print(f"Upload files: {upload_url}") server_thread.start() # ... later ... shutdown_server()
32def find_available_port() -> int: 33 """ 34 Finds and returns an available TCP port on the local machine. 35 36 This function utilizes a TCP server socket to bind to port 0, which 37 instructs the operating system to automatically assign an available 38 port. The assigned port is then extracted and returned. 39 40 Returns: 41 int: The available TCP port number. 42 43 Raises: 44 OSError: If an error occurs during the port binding process, such 45 as all ports being in use. 46 47 Example: 48 port = find_available_port() 49 print(f"Available port: {port}") 50 """ 51 with socketserver.TCPServer(("localhost", 0), None) as s: 52 return s.server_address[1]
Finds and returns an available TCP port on the local machine.
This function utilizes a TCP server socket to bind to port 0, which instructs the operating system to automatically assign an available port. The assigned port is then extracted and returned.
Returns: int: The available TCP port number.
Raises: OSError: If an error occurs during the port binding process, such as all ports being in use.
Example: port = find_available_port() print(f"Available port: {port}")