zakat
xxx
_____ _ _ _ _ _
|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _
/ // _| |/ / _
| __| | | | | '_ \| '__/ _` | '__| | | |
/ /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| |
/______,_|_|___,_|__| |_____|_|_.__/|_| __,_|_| __, |
|___/
"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف ... Never Trust, Always Verify ...
This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat.
1""" 2 _____ _ _ _ _ _ 3|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _ 4 / // _` | |/ / _` | __| | | | | '_ \| '__/ _` | '__| | | | 5 / /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| | 6/____\__,_|_|\_\__,_|\__| |_____|_|_.__/|_| \__,_|_| \__, | 7 |___/ 8 9"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف 10... Never Trust, Always Verify ... 11 12This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat. 13""" 14# Importing necessary classes and functions from the main module 15from zakat.zakat_tracker import ( 16 ZakatTracker, 17 test, 18 Action, 19 JSONEncoder, 20 MathOperation, 21) 22 23from zakat.file_server import ( 24 start_file_server, 25 find_available_port, 26 FileType, 27) 28 29# Version information for the module 30__version__ = ZakatTracker.Version() 31__all__ = [ 32 "ZakatTracker", 33 "test", 34 "Action", 35 "JSONEncoder", 36 "MathOperation", 37 "start_file_server", 38 "find_available_port", 39 "FileType", 40]
122class ZakatTracker: 123 """ 124 A class for tracking and calculating Zakat. 125 126 This class provides functionalities for recording transactions, calculating Zakat due, 127 and managing account balances. It also offers features like importing transactions from 128 CSV files, exporting data to JSON format, and saving/loading the tracker state. 129 130 The `ZakatTracker` class is designed to handle both positive and negative transactions, 131 allowing for flexible tracking of financial activities related to Zakat. It also supports 132 the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due 133 based on the current silver price. 134 135 The class uses a camel file as its database to persist the tracker state, 136 ensuring data integrity across sessions. It also provides options for enabling or 137 disabling history tracking, allowing users to choose their preferred level of detail. 138 139 In addition, the `ZakatTracker` class includes various helper methods like 140 `time`, `time_to_datetime`, `lock`, `free`, `recall`, `export_json`, 141 and more. These methods provide additional functionalities and flexibility 142 for interacting with and managing the Zakat tracker. 143 144 Attributes: 145 ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. 146 ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. 147 ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. 148 ZakatTracker.Version (function): The version of the ZakatTracker class. 149 150 Data Structure: 151 The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data. 152 153 _vault (dict): 154 - account (dict): 155 - {account_number} (dict): 156 - balance (int): The current balance of the account. 157 - box (dict): A dictionary storing transaction details. 158 - {timestamp} (dict): 159 - capital (int): The initial amount of the transaction. 160 - count (int): The number of times Zakat has been calculated for this transaction. 161 - last (int): The timestamp of the last Zakat calculation. 162 - rest (int): The remaining amount after Zakat deductions and withdrawal. 163 - total (int): The total Zakat deducted from this transaction. 164 - count (int): The total number of transactions for the account. 165 - log (dict): A dictionary storing transaction logs. 166 - {timestamp} (dict): 167 - value (int): The transaction amount (positive or negative). 168 - desc (str): The description of the transaction. 169 - ref (int): The box reference (positive or None). 170 - file (dict): A dictionary storing file references associated with the transaction. 171 - hide (bool): Indicates whether the account is hidden or not. 172 - zakatable (bool): Indicates whether the account is subject to Zakat. 173 - exchange (dict): 174 - account (dict): 175 - {timestamps} (dict): 176 - rate (float): Exchange rate when compared to local currency. 177 - description (str): The description of the exchange rate. 178 - history (dict): 179 - {timestamp} (list): A list of dictionaries storing the history of actions performed. 180 - {action_dict} (dict): 181 - action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT). 182 - account (str): The account number associated with the action. 183 - ref (int): The reference number of the transaction. 184 - file (int): The reference number of the file (if applicable). 185 - key (str): The key associated with the action (e.g., 'rest', 'total'). 186 - value (int): The value associated with the action. 187 - math (MathOperation): The mathematical operation performed (if applicable). 188 - lock (int or None): The timestamp indicating the current lock status (None if not locked). 189 - report (dict): 190 - {timestamp} (tuple): A tuple storing Zakat report details. 191 192 """ 193 194 @staticmethod 195 def Version() -> str: 196 """ 197 Returns the current version of the software. 198 199 This function returns a string representing the current version of the software, 200 including major, minor, and patch version numbers in the format "X.Y.Z". 201 202 Returns: 203 str: The current version of the software. 204 """ 205 return '0.2.88' 206 207 @staticmethod 208 def ZakatCut(x: float) -> float: 209 """ 210 Calculates the Zakat amount due on an asset. 211 212 This function calculates the zakat amount due on a given asset value over one lunar year. 213 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 214 that exceeds a certain threshold (Nisab). 215 216 Parameters: 217 x: The total value of the asset on which Zakat is to be calculated. 218 219 Returns: 220 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 221 """ 222 return 0.025 * x # Zakat Cut in one Lunar Year 223 224 @staticmethod 225 def TimeCycle(days: int = 355) -> int: 226 """ 227 Calculates the approximate duration of a lunar year in nanoseconds. 228 229 This function calculates the approximate duration of a lunar year based on the given number of days. 230 It converts the given number of days into nanoseconds for use in high-precision timing applications. 231 232 Parameters: 233 days: The number of days in a lunar year. Defaults to 355, 234 which is an approximation of the average length of a lunar year. 235 236 Returns: 237 The approximate duration of a lunar year in nanoseconds. 238 """ 239 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds 240 241 @staticmethod 242 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 243 """ 244 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 245 246 This function calculates the Nisab value, which is the minimum threshold of wealth, 247 that makes an individual liable for paying Zakat. 248 The Nisab value is determined by the equivalent value of a specific amount 249 of gold or silver (currently 595 grams in silver) in the local currency. 250 251 Parameters: 252 - gram_price (float): The price per gram of Nisab. 253 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 254 255 Returns: 256 - float: The total value of Nisab based on the given price per gram. 257 """ 258 return gram_price * gram_quantity 259 260 @staticmethod 261 def ext() -> str: 262 """ 263 Returns the file extension used by the ZakatTracker class. 264 265 Returns: 266 str: The file extension used by the ZakatTracker class, which is 'camel'. 267 """ 268 return 'camel' 269 270 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 271 """ 272 Initialize ZakatTracker with database path and history mode. 273 274 Parameters: 275 db_path (str): The path to the database file. Default is "zakat.camel". 276 history_mode (bool): The mode for tracking history. Default is True. 277 278 Returns: 279 None 280 """ 281 self._base_path = None 282 self._vault_path = None 283 self._vault = None 284 self.reset() 285 self._history(history_mode) 286 self.path(db_path) 287 288 def path(self, path: str = None) -> str: 289 """ 290 Set or get the path to the database file. 291 292 If no path is provided, the current path is returned. 293 If a path is provided, it is set as the new path. 294 The function also creates the necessary directories if the provided path is a file. 295 296 Parameters: 297 path (str): The new path to the database file. If not provided, the current path is returned. 298 299 Returns: 300 str: The current or new path to the database file. 301 """ 302 if path is None: 303 return self._vault_path 304 self._vault_path = Path(path).resolve() 305 base_path = Path(path).resolve() 306 if base_path.is_file() or base_path.suffix: 307 base_path = base_path.parent 308 base_path.mkdir(parents=True, exist_ok=True) 309 self._base_path = base_path 310 return str(self._vault_path) 311 312 def base_path(self, *args) -> str: 313 """ 314 Generate a base path by joining the provided arguments with the existing base path. 315 316 Parameters: 317 *args (str): Variable length argument list of strings to be joined with the base path. 318 319 Returns: 320 str: The generated base path. If no arguments are provided, the existing base path is returned. 321 """ 322 if not args: 323 return str(self._base_path) 324 filtered_args = [] 325 ignored_filename = None 326 for arg in args: 327 if Path(arg).suffix: 328 ignored_filename = arg 329 else: 330 filtered_args.append(arg) 331 base_path = Path(self._base_path) 332 full_path = base_path.joinpath(*filtered_args) 333 full_path.mkdir(parents=True, exist_ok=True) 334 if ignored_filename is not None: 335 return full_path.resolve() / ignored_filename # Join with the ignored filename 336 return str(full_path.resolve()) 337 338 @staticmethod 339 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 340 """ 341 Scales a numerical value by a specified power of 10, returning an integer. 342 343 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 344 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 345 346 Parameters: 347 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 348 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 349 by a factor of 100 (e.g., converts 1.23 to 123). 350 351 Returns: 352 The scaled value, rounded to the nearest integer. 353 354 Raises: 355 TypeError: If the input `x` is not a valid numeric type. 356 357 Examples: 358 >>> ZakatTracker.scale(3.14159) 359 314 360 >>> ZakatTracker.scale(1234, decimal_places=3) 361 1234000 362 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 363 50 364 """ 365 if not isinstance(x, (float, int, Decimal)): 366 raise TypeError("Input 'x' must be a float, int, or Decimal.") 367 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places)) 368 369 @staticmethod 370 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 371 """ 372 Unscales an integer by a power of 10. 373 374 Parameters: 375 x: The integer to unscale. 376 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 377 decimal_places: The power of 10 to use. Defaults to 2. 378 379 Returns: 380 The unscaled number, converted to the specified return_type. 381 382 Raises: 383 TypeError: If the return_type is not float or Decimal. 384 """ 385 if return_type not in (float, Decimal): 386 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 387 return round(return_type(x / (10 ** decimal_places)), decimal_places) 388 389 def _history(self, status: bool = None) -> bool: 390 """ 391 Enable or disable history tracking. 392 393 Parameters: 394 status (bool): The status of history tracking. Default is True. 395 396 Returns: 397 None 398 """ 399 if status is not None: 400 self._history_mode = status 401 return self._history_mode 402 403 def reset(self) -> None: 404 """ 405 Reset the internal data structure to its initial state. 406 407 Parameters: 408 None 409 410 Returns: 411 None 412 """ 413 self._vault = { 414 'account': {}, 415 'exchange': {}, 416 'history': {}, 417 'lock': None, 418 'report': {}, 419 } 420 421 @staticmethod 422 def time(now: datetime = None) -> int: 423 """ 424 Generates a timestamp based on the provided datetime object or the current datetime. 425 426 Parameters: 427 now (datetime, optional): The datetime object to generate the timestamp from. 428 If not provided, the current datetime is used. 429 430 Returns: 431 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 432 before 1970 will return in negative until 1000AD. 433 """ 434 if now is None: 435 now = datetime.datetime.now() 436 ordinal_day = now.toordinal() 437 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 438 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day) 439 440 @staticmethod 441 def time_to_datetime(ordinal_ns: int) -> datetime: 442 """ 443 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 444 445 Parameters: 446 ordinal_ns (int): The ordinal number of days since 1000-01-01. 447 448 Returns: 449 datetime: The corresponding datetime object. 450 """ 451 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 452 ns_in_day = ordinal_ns % 86_400_000_000_000 453 d = datetime.datetime.fromordinal(ordinal_day) 454 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 455 return datetime.datetime.combine(d, datetime.time()) + t 456 457 def clean_history(self, lock: int | None = None) -> int: 458 """ 459 Cleans up the history of actions performed on the ZakatTracker instance. 460 461 Parameters: 462 lock (int, optional): The lock ID is used to clean up the empty history. 463 If not provided, it cleans up the empty history records for all locks. 464 465 Returns: 466 int: The number of locks cleaned up. 467 """ 468 count = 0 469 if lock in self._vault['history']: 470 if len(self._vault['history'][lock]) <= 0: 471 count += 1 472 del self._vault['history'][lock] 473 return count 474 self.free(self.lock()) 475 for lock in self._vault['history']: 476 if len(self._vault['history'][lock]) <= 0: 477 count += 1 478 del self._vault['history'][lock] 479 return count 480 481 def _step(self, action: Action = None, account=None, ref: int = None, file: int = None, value: float = None, 482 key: str = None, math_operation: MathOperation = None) -> int: 483 """ 484 This method is responsible for recording the actions performed on the ZakatTracker. 485 486 Parameters: 487 - action (Action): The type of action performed. 488 - account (str): The account number on which the action was performed. 489 - ref (int): The reference number of the action. 490 - file (int): The file reference number of the action. 491 - value (int): The value associated with the action. 492 - key (str): The key associated with the action. 493 - math_operation (MathOperation): The mathematical operation performed during the action. 494 495 Returns: 496 - int: The lock time of the recorded action. If no lock was performed, it returns 0. 497 """ 498 if not self._history(): 499 return 0 500 lock = self._vault['lock'] 501 if self.nolock(): 502 lock = self._vault['lock'] = self.time() 503 self._vault['history'][lock] = [] 504 if action is None: 505 return lock 506 self._vault['history'][lock].append({ 507 'action': action, 508 'account': account, 509 'ref': ref, 510 'file': file, 511 'key': key, 512 'value': value, 513 'math': math_operation, 514 }) 515 return lock 516 517 def nolock(self) -> bool: 518 """ 519 Check if the vault lock is currently not set. 520 521 Returns: 522 bool: True if the vault lock is not set, False otherwise. 523 """ 524 return self._vault['lock'] is None 525 526 def lock(self) -> int: 527 """ 528 Acquires a lock on the ZakatTracker instance. 529 530 Returns: 531 int: The lock ID. This ID can be used to release the lock later. 532 """ 533 return self._step() 534 535 def vault(self) -> dict: 536 """ 537 Returns a copy of the internal vault dictionary. 538 539 This method is used to retrieve the current state of the ZakatTracker object. 540 It provides a snapshot of the internal data structure, allowing for further 541 processing or analysis. 542 543 Returns: 544 dict: A copy of the internal vault dictionary. 545 """ 546 return self._vault.copy() 547 548 def stats(self) -> dict[str, tuple]: 549 """ 550 Calculates and returns statistics about the object's data storage. 551 552 This method determines the size of the database file on disk and the 553 size of the data currently held in RAM (likely within a dictionary). 554 Both sizes are reported in bytes and in a human-readable format 555 (e.g., KB, MB). 556 557 Returns: 558 dict[str, tuple]: A dictionary containing the following statistics: 559 560 * 'database': A tuple with two elements: 561 - The database file size in bytes (int). 562 - The database file size in human-readable format (str). 563 * 'ram': A tuple with two elements: 564 - The RAM usage (dictionary size) in bytes (int). 565 - The RAM usage in human-readable format (str). 566 567 Example: 568 >>> stats = my_object.stats() 569 >>> print(stats['database']) 570 (256000, '250.0 KB') 571 >>> print(stats['ram']) 572 (12345, '12.1 KB') 573 """ 574 ram_size = self.get_dict_size(self.vault()) 575 file_size = os.path.getsize(self.path()) 576 return { 577 'database': (file_size, self.human_readable_size(file_size)), 578 'ram': (ram_size, self.human_readable_size(ram_size)), 579 } 580 581 def files(self) -> list[dict[str, str | int]]: 582 """ 583 Retrieves information about files associated with this class. 584 585 This class method provides a standardized way to gather details about 586 files used by the class for storage, snapshots, and CSV imports. 587 588 Returns: 589 list[dict[str, str | int]]: A list of dictionaries, each containing information 590 about a specific file: 591 592 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 593 * path (str): The full file path. 594 * exists (bool): Whether the file exists on the filesystem. 595 * size (int): The file size in bytes (0 if the file doesn't exist). 596 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 597 598 Example: 599 ``` 600 file_info = MyClass.files() 601 for info in file_info: 602 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 603 ``` 604 """ 605 result = [] 606 for file_type, path in { 607 'database': self.path(), 608 'snapshot': self.snapshot_cache_path(), 609 'import_csv': self.import_csv_cache_path(), 610 }.items(): 611 exists = os.path.exists(path) 612 size = os.path.getsize(path) if exists else 0 613 human_readable_size = self.human_readable_size(size) if exists else 0 614 result.append({ 615 'type': file_type, 616 'path': path, 617 'exists': exists, 618 'size': size, 619 'human_readable_size': human_readable_size, 620 }) 621 return result 622 623 def steps(self) -> dict: 624 """ 625 Returns a copy of the history of steps taken in the ZakatTracker. 626 627 The history is a dictionary where each key is a unique identifier for a step, 628 and the corresponding value is a dictionary containing information about the step. 629 630 Returns: 631 dict: A copy of the history of steps taken in the ZakatTracker. 632 """ 633 return self._vault['history'].copy() 634 635 def free(self, lock: int, auto_save: bool = True) -> bool: 636 """ 637 Releases the lock on the database. 638 639 Parameters: 640 lock (int): The lock ID to be released. 641 auto_save (bool): Whether to automatically save the database after releasing the lock. 642 643 Returns: 644 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 645 """ 646 if lock == self._vault['lock']: 647 self._vault['lock'] = None 648 self.clean_history(lock) 649 if auto_save: 650 return self.save(self.path()) 651 return True 652 return False 653 654 def account_exists(self, account) -> bool: 655 """ 656 Check if the given account exists in the vault. 657 658 Parameters: 659 account (str): The account number to check. 660 661 Returns: 662 bool: True if the account exists, False otherwise. 663 """ 664 return account in self._vault['account'] 665 666 def box_size(self, account) -> int: 667 """ 668 Calculate the size of the box for a specific account. 669 670 Parameters: 671 account (str): The account number for which the box size needs to be calculated. 672 673 Returns: 674 int: The size of the box for the given account. If the account does not exist, -1 is returned. 675 """ 676 if self.account_exists(account): 677 return len(self._vault['account'][account]['box']) 678 return -1 679 680 def log_size(self, account) -> int: 681 """ 682 Get the size of the log for a specific account. 683 684 Parameters: 685 account (str): The account number for which the log size needs to be calculated. 686 687 Returns: 688 int: The size of the log for the given account. If the account does not exist, -1 is returned. 689 """ 690 if self.account_exists(account): 691 return len(self._vault['account'][account]['log']) 692 return -1 693 694 @staticmethod 695 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 696 """ 697 Calculates the hash of a file using the specified algorithm. 698 699 Parameters: 700 file_path (str): The path to the file. 701 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 702 703 Returns: 704 str: The hexadecimal representation of the file's hash. 705 """ 706 hash_obj = hashlib.new(algorithm) # Create the hash object 707 with open(file_path, "rb") as f: # Open file in binary mode for reading 708 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 709 hash_obj.update(chunk) 710 return hash_obj.hexdigest() # Return the hash as a hexadecimal string 711 712 def snapshot_cache_path(self): 713 """ 714 Generate the path for the cache file used to store snapshots. 715 716 The cache file is a camel file that stores the timestamps of the snapshots. 717 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 718 719 Returns: 720 str: The path to the cache file. 721 """ 722 path = str(self.path()) 723 ext = self.ext() 724 ext_len = len(ext) 725 if path.endswith(f'.{ext}'): 726 path = path[:-ext_len-1] 727 _, filename = os.path.split(path + f'.snapshots.{ext}') 728 return self.base_path(filename) 729 730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True 761 762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result 793 794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True 941 942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False 957 958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref) 970 971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created 1036 1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref) 1049 1050 def _log(self, value: float, desc: str = '', account: str = 1, created: int = None, ref: int = None, 1051 debug: bool = False) -> int: 1052 """ 1053 Log a transaction into the account's log. 1054 1055 Parameters: 1056 value (float): The value of the transaction. 1057 desc (str): The description of the transaction. 1058 account (str): The account to log the transaction into. Default is '1'. 1059 created (int): The timestamp of the transaction. If not provided, it will be generated. 1060 1061 Returns: 1062 int: The timestamp of the logged transaction. 1063 1064 This method updates the account's balance, count, and log with the transaction details. 1065 It also creates a step in the history of the transaction. 1066 1067 Raises: 1068 ValueError: The log transaction happened again in the same nanosecond time. 1069 """ 1070 if debug: 1071 print('_log', f'debug={debug}') 1072 if created is None: 1073 created = self.time() 1074 try: 1075 self._vault['account'][account]['balance'] += value 1076 except TypeError: 1077 self._vault['account'][account]['balance'] += Decimal(value) 1078 self._vault['account'][account]['count'] += 1 1079 if debug: 1080 print('create-log', created) 1081 if self.log_exists(account, created): 1082 raise ValueError(f"The log transaction happened again in the same nanosecond time({created}).") 1083 if debug: 1084 print('created-log', created) 1085 self._vault['account'][account]['log'][created] = { 1086 'value': value, 1087 'desc': desc, 1088 'ref': ref, 1089 'file': {}, 1090 } 1091 self._step(Action.LOG, account, ref=created, value=value) 1092 return created 1093 1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ 1144 1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate 1159 1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy() 1173 1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result 1188 1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {} 1203 1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {} 1218 1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0 1242 1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False 1267 1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1] 1287 1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False 1320 1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False 1353 1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages 1433 1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times 1508 1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan 1621 1622 def build_payment_parts(self, scaled_demand: int, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 scaled_demand (int): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': int, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': int(round(scaled_demand)), 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts 1656 1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0 1707 1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub( 1779 unscaled_value=self.unscale(amount), 1780 desc='zakat-part-دفعة-زكاة', 1781 account=account, 1782 debug=debug, 1783 ) 1784 if no_lock: 1785 self.free(self.lock()) 1786 return True 1787 1788 def export_json(self, path: str = "data.json") -> bool: 1789 """ 1790 Exports the current state of the ZakatTracker object to a JSON file. 1791 1792 Parameters: 1793 path (str): The path where the JSON file will be saved. Default is "data.json". 1794 1795 Returns: 1796 bool: True if the export is successful, False otherwise. 1797 1798 Raises: 1799 No specific exceptions are raised by this method. 1800 """ 1801 with open(path, "w") as file: 1802 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1803 return True 1804 1805 def save(self, path: str = None) -> bool: 1806 """ 1807 Saves the ZakatTracker's current state to a camel file. 1808 1809 This method serializes the internal data (`_vault`). 1810 1811 Parameters: 1812 path (str, optional): File path for saving. Defaults to a predefined location. 1813 1814 Returns: 1815 bool: True if the save operation is successful, False otherwise. 1816 """ 1817 if path is None: 1818 path = self.path() 1819 with open(f'{path}.tmp', 'w') as stream: 1820 # first save in tmp file 1821 stream.write(camel.dump(self._vault)) 1822 # then move tmp file to original location 1823 shutil.move(f'{path}.tmp', path) 1824 return True 1825 1826 def load(self, path: str = None) -> bool: 1827 """ 1828 Load the current state of the ZakatTracker object from a camel file. 1829 1830 Parameters: 1831 path (str): The path where the camel file is located. If not provided, it will use the default path. 1832 1833 Returns: 1834 bool: True if the load operation is successful, False otherwise. 1835 """ 1836 if path is None: 1837 path = self.path() 1838 if os.path.exists(path): 1839 with open(path, 'r') as stream: 1840 self._vault = camel.load(stream.read()) 1841 return True 1842 return False 1843 1844 def import_csv_cache_path(self): 1845 """ 1846 Generates the cache file path for imported CSV data. 1847 1848 This function constructs the file path where cached data from CSV imports 1849 will be stored. The cache file is a camel file (.camel extension) appended 1850 to the base path of the object. 1851 1852 Returns: 1853 str: The full path to the import CSV cache file. 1854 1855 Example: 1856 >>> obj = ZakatTracker('/data/reports') 1857 >>> obj.import_csv_cache_path() 1858 '/data/reports.import_csv.camel' 1859 """ 1860 path = str(self.path()) 1861 ext = self.ext() 1862 ext_len = len(ext) 1863 if path.endswith(f'.{ext}'): 1864 path = path[:-ext_len-1] 1865 _, filename = os.path.split(path + f'.import_csv.{ext}') 1866 return self.base_path(filename) 1867 1868 def import_csv(self, path: str = 'file.csv', scale_decimal_places: int = 0, debug: bool = False) -> tuple: 1869 """ 1870 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1871 1872 Parameters: 1873 path (str): The path to the CSV file. Default is 'file.csv'. 1874 scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. 1875 debug (bool): A flag indicating whether to print debug information. 1876 1877 Returns: 1878 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1879 and a dictionary of bad transactions. 1880 1881 Notes: 1882 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1883 are appropriate for the currency pairs involved in the conversions. 1884 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1885 to 1.0 or the previous rate for that account. 1886 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1887 transactions of the same account within the whole imported and existing dataset when doing `check` and 1888 `zakat` operations. 1889 1890 Example Usage: 1891 The CSV file should have the following format, rate is optional per transaction: 1892 account, desc, value, date, rate 1893 For example: 1894 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1895 """ 1896 if debug: 1897 print('import_csv', f'debug={debug}') 1898 cache: list[int] = [] 1899 try: 1900 with open(self.import_csv_cache_path(), 'r') as stream: 1901 cache = camel.load(stream.read()) 1902 except: 1903 pass 1904 date_formats = [ 1905 "%Y-%m-%d %H:%M:%S", 1906 "%Y-%m-%dT%H:%M:%S", 1907 "%Y-%m-%dT%H%M%S", 1908 "%Y-%m-%d", 1909 ] 1910 created, found, bad = 0, 0, {} 1911 data: dict[int, list] = {} 1912 with open(path, newline='', encoding="utf-8") as f: 1913 i = 0 1914 for row in csv.reader(f, delimiter=','): 1915 i += 1 1916 hashed = hash(tuple(row)) 1917 if hashed in cache: 1918 found += 1 1919 continue 1920 account = row[0] 1921 desc = row[1] 1922 value = float(row[2]) 1923 rate = 1.0 1924 if row[4:5]: # Empty list if index is out of range 1925 rate = float(row[4]) 1926 date: int = 0 1927 for time_format in date_formats: 1928 try: 1929 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1930 break 1931 except: 1932 pass 1933 # TODO: not allowed for negative dates in the future after enhance time functions 1934 if date == 0: 1935 bad[i] = row + ['invalid date'] 1936 if value == 0: 1937 bad[i] = row + ['invalid value'] 1938 continue 1939 if date not in data: 1940 data[date] = [] 1941 data[date].append((i, account, desc, value, date, rate, hashed)) 1942 1943 if debug: 1944 print('import_csv', len(data)) 1945 1946 if bad: 1947 return created, found, bad 1948 1949 for date, rows in sorted(data.items()): 1950 try: 1951 len_rows = len(rows) 1952 if len_rows == 1: 1953 (_, account, desc, unscaled_value, date, rate, hashed) = rows[0] 1954 value = self.unscale(unscaled_value, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value 1955 if rate > 0: 1956 self.exchange(account=account, created=date, rate=rate) 1957 if value > 0: 1958 self.track(unscaled_value=value, desc=desc, account=account, logging=True, created=date) 1959 elif value < 0: 1960 self.sub(unscaled_value=-value, desc=desc, account=account, created=date) 1961 created += 1 1962 cache.append(hashed) 1963 continue 1964 if debug: 1965 print('-- Duplicated time detected', date, 'len', len_rows) 1966 print(rows) 1967 print('---------------------------------') 1968 # If records are found at the same time with different accounts in the same amount 1969 # (one positive and the other negative), this indicates it is a transfer. 1970 if len_rows != 2: 1971 raise Exception(f'more than two transactions({len_rows}) at the same time') 1972 (i, account1, desc1, unscaled_value1, date1, rate1, _) = rows[0] 1973 (j, account2, desc2, unscaled_value2, date2, rate2, _) = rows[1] 1974 if account1 == account2 or desc1 != desc2 or abs(unscaled_value1) != abs(unscaled_value2) or date1 != date2: 1975 raise Exception('invalid transfer') 1976 if rate1 > 0: 1977 self.exchange(account1, created=date1, rate=rate1) 1978 if rate2 > 0: 1979 self.exchange(account2, created=date2, rate=rate2) 1980 value1 = self.unscale(unscaled_value1, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value1 1981 value2 = self.unscale(unscaled_value2, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value2 1982 values = { 1983 value1: account1, 1984 value2: account2, 1985 } 1986 self.transfer( 1987 unscaled_amount=abs(value1), 1988 from_account=values[min(values.keys())], 1989 to_account=values[max(values.keys())], 1990 desc=desc1, 1991 created=date1, 1992 ) 1993 except Exception as e: 1994 for (i, account, desc, value, date, rate, _) in rows: 1995 bad[i] = (account, desc, value, date, rate, e) 1996 break 1997 with open(self.import_csv_cache_path(), 'w') as stream: 1998 stream.write(camel.dump(cache)) 1999 return created, found, bad 2000 2001 ######## 2002 # TESTS # 2003 ####### 2004 2005 @staticmethod 2006 def human_readable_size(size: float, decimal_places: int = 2) -> str: 2007 """ 2008 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 2009 2010 This function iterates through progressively larger units of information 2011 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2012 range that can be expressed with a reasonable number before the unit. 2013 2014 Parameters: 2015 size (float): The size in bytes to convert. 2016 decimal_places (int, optional): The number of decimal places to display 2017 in the result. Defaults to 2. 2018 2019 Returns: 2020 str: A string representation of the size in a human-readable format, 2021 rounded to the specified number of decimal places. For example: 2022 - "1.50 KB" (1536 bytes) 2023 - "23.00 MB" (24117248 bytes) 2024 - "1.23 GB" (1325899906 bytes) 2025 """ 2026 if type(size) not in (float, int): 2027 raise TypeError("size must be a float or integer") 2028 if type(decimal_places) != int: 2029 raise TypeError("decimal_places must be an integer") 2030 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2031 if size < 1024.0: 2032 break 2033 size /= 1024.0 2034 return f"{size:.{decimal_places}f} {unit}" 2035 2036 @staticmethod 2037 def get_dict_size(obj: dict, seen: set = None) -> float: 2038 """ 2039 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2040 2041 This function traverses the dictionary structure, accounting for the size of keys, values, 2042 and any nested objects. It handles various data types commonly found in dictionaries 2043 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2044 of circular references. 2045 2046 Parameters: 2047 obj (dict): The dictionary whose size is to be calculated. 2048 seen (set, optional): A set used internally to track visited objects 2049 and avoid circular references. Defaults to None. 2050 2051 Returns: 2052 float: An approximate size of the dictionary and its contents in bytes. 2053 2054 Note: 2055 - This function is a method of the `ZakatTracker` class and is likely used to 2056 estimate the memory footprint of data structures relevant to Zakat calculations. 2057 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2058 not account for all memory overhead depending on the Python implementation. 2059 - Circular references are handled to prevent infinite recursion. 2060 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2061 - String sizes are estimated based on character length and encoding. 2062 """ 2063 size = 0 2064 if seen is None: 2065 seen = set() 2066 2067 obj_id = id(obj) 2068 if obj_id in seen: 2069 return 0 2070 2071 seen.add(obj_id) 2072 size += sys.getsizeof(obj) 2073 2074 if isinstance(obj, dict): 2075 for k, v in obj.items(): 2076 size += ZakatTracker.get_dict_size(k, seen) 2077 size += ZakatTracker.get_dict_size(v, seen) 2078 elif isinstance(obj, (list, tuple, set, frozenset)): 2079 for item in obj: 2080 size += ZakatTracker.get_dict_size(item, seen) 2081 elif isinstance(obj, (int, float, complex)): # Handle numbers 2082 pass # Basic numbers have a fixed size, so nothing to add here 2083 elif isinstance(obj, str): # Handle strings 2084 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2085 return size 2086 2087 @staticmethod 2088 def duration_from_nanoseconds(ns: int, 2089 show_zeros_in_spoken_time: bool = False, 2090 spoken_time_separator=',', 2091 millennia: str = 'Millennia', 2092 century: str = 'Century', 2093 years: str = 'Years', 2094 days: str = 'Days', 2095 hours: str = 'Hours', 2096 minutes: str = 'Minutes', 2097 seconds: str = 'Seconds', 2098 milli_seconds: str = 'MilliSeconds', 2099 micro_seconds: str = 'MicroSeconds', 2100 nano_seconds: str = 'NanoSeconds', 2101 ) -> tuple: 2102 """ 2103 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2104 Convert NanoSeconds to Human Readable Time Format. 2105 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2106 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2107 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2108 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2109 2110 INPUT : ms (AKA: MilliSeconds) 2111 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2112 OUTPUT Variables: time_lapsed, spoken_time 2113 2114 Example Input: duration_from_nanoseconds(ns) 2115 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2116 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2117 duration_from_nanoseconds(1234567890123456789012) 2118 """ 2119 us, ns = divmod(ns, 1000) 2120 ms, us = divmod(us, 1000) 2121 s, ms = divmod(ms, 1000) 2122 m, s = divmod(s, 60) 2123 h, m = divmod(m, 60) 2124 d, h = divmod(h, 24) 2125 y, d = divmod(d, 365) 2126 c, y = divmod(y, 100) 2127 n, c = divmod(c, 10) 2128 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2129 spoken_time_part = [] 2130 if n > 0 or show_zeros_in_spoken_time: 2131 spoken_time_part.append(f"{n: 3d} {millennia}") 2132 if c > 0 or show_zeros_in_spoken_time: 2133 spoken_time_part.append(f"{c: 4d} {century}") 2134 if y > 0 or show_zeros_in_spoken_time: 2135 spoken_time_part.append(f"{y: 3d} {years}") 2136 if d > 0 or show_zeros_in_spoken_time: 2137 spoken_time_part.append(f"{d: 4d} {days}") 2138 if h > 0 or show_zeros_in_spoken_time: 2139 spoken_time_part.append(f"{h: 2d} {hours}") 2140 if m > 0 or show_zeros_in_spoken_time: 2141 spoken_time_part.append(f"{m: 2d} {minutes}") 2142 if s > 0 or show_zeros_in_spoken_time: 2143 spoken_time_part.append(f"{s: 2d} {seconds}") 2144 if ms > 0 or show_zeros_in_spoken_time: 2145 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2146 if us > 0 or show_zeros_in_spoken_time: 2147 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2148 if ns > 0 or show_zeros_in_spoken_time: 2149 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2150 return time_lapsed, spoken_time_separator.join(spoken_time_part) 2151 2152 @staticmethod 2153 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2154 """ 2155 Convert a specific day, month, and year into a timestamp. 2156 2157 Parameters: 2158 day (int): The day of the month. 2159 month (int): The month of the year. Default is 6 (June). 2160 year (int): The year. Default is 2024. 2161 2162 Returns: 2163 int: The timestamp representing the given day, month, and year. 2164 2165 Note: 2166 This method assumes the default month and year if not provided. 2167 """ 2168 return ZakatTracker.time(datetime.datetime(year, month, day)) 2169 2170 @staticmethod 2171 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2172 """ 2173 Generate a random date between two given dates. 2174 2175 Parameters: 2176 start_date (datetime.datetime): The start date from which to generate a random date. 2177 end_date (datetime.datetime): The end date until which to generate a random date. 2178 2179 Returns: 2180 datetime.datetime: A random date between the start_date and end_date. 2181 """ 2182 time_between_dates = end_date - start_date 2183 days_between_dates = time_between_dates.days 2184 random_number_of_days = random.randrange(days_between_dates) 2185 return start_date + datetime.timedelta(days=random_number_of_days) 2186 2187 @staticmethod 2188 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2189 debug: bool = False) -> int: 2190 """ 2191 Generate a random CSV file with specified parameters. 2192 2193 Parameters: 2194 path (str): The path where the CSV file will be saved. Default is "data.csv". 2195 count (int): The number of rows to generate in the CSV file. Default is 1000. 2196 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2197 debug (bool): A flag indicating whether to print debug information. 2198 2199 Returns: 2200 None. The function generates a CSV file at the specified path with the given count of rows. 2201 Each row contains a randomly generated account, description, value, and date. 2202 The value is randomly generated between 1000 and 100000, 2203 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2204 If the row number is not divisible by 13, the value is multiplied by -1. 2205 """ 2206 if debug: 2207 print('generate_random_csv_file', f'debug={debug}') 2208 i = 0 2209 with open(path, "w", newline="") as csvfile: 2210 writer = csv.writer(csvfile) 2211 for i in range(count): 2212 account = f"acc-{random.randint(1, 1000)}" 2213 desc = f"Some text {random.randint(1, 1000)}" 2214 value = random.randint(1000, 100000) 2215 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2216 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2217 if not i % 13 == 0: 2218 value *= -1 2219 row = [account, desc, value, date] 2220 if with_rate: 2221 rate = random.randint(1, 100) * 0.12 2222 if debug: 2223 print('before-append', row) 2224 row.append(rate) 2225 if debug: 2226 print('after-append', row) 2227 writer.writerow(row) 2228 i = i + 1 2229 return i 2230 2231 @staticmethod 2232 def create_random_list(max_sum, min_value=0, max_value=10): 2233 """ 2234 Creates a list of random integers whose sum does not exceed the specified maximum. 2235 2236 Args: 2237 max_sum: The maximum allowed sum of the list elements. 2238 min_value: The minimum possible value for an element (inclusive). 2239 max_value: The maximum possible value for an element (inclusive). 2240 2241 Returns: 2242 A list of random integers. 2243 """ 2244 result = [] 2245 current_sum = 0 2246 2247 while current_sum < max_sum: 2248 # Calculate the remaining space for the next element 2249 remaining_sum = max_sum - current_sum 2250 # Determine the maximum possible value for the next element 2251 next_max_value = min(remaining_sum, max_value) 2252 # Generate a random element within the allowed range 2253 next_element = random.randint(min_value, next_max_value) 2254 result.append(next_element) 2255 current_sum += next_element 2256 2257 return result 2258 2259 def _test_core(self, restore=False, debug=False): 2260 2261 if debug: 2262 random.seed(1234567890) 2263 2264 # sanity check - random forward time 2265 2266 xlist = [] 2267 limit = 1000 2268 for _ in range(limit): 2269 y = ZakatTracker.time() 2270 z = '-' 2271 if y not in xlist: 2272 xlist.append(y) 2273 else: 2274 z = 'x' 2275 if debug: 2276 print(z, y) 2277 xx = len(xlist) 2278 if debug: 2279 print('count', xx, ' - unique: ', (xx / limit) * 100, '%') 2280 assert limit == xx 2281 2282 # sanity check - convert date since 1000AD 2283 2284 for year in range(1000, 9000): 2285 ns = ZakatTracker.time(datetime.datetime.strptime(f"{year}-12-30 18:30:45", "%Y-%m-%d %H:%M:%S")) 2286 date = ZakatTracker.time_to_datetime(ns) 2287 if debug: 2288 print(date) 2289 assert date.year == year 2290 assert date.month == 12 2291 assert date.day == 30 2292 assert date.hour == 18 2293 assert date.minute == 30 2294 assert date.second in [44, 45] 2295 2296 # human_readable_size 2297 2298 assert ZakatTracker.human_readable_size(0) == "0.00 B" 2299 assert ZakatTracker.human_readable_size(512) == "512.00 B" 2300 assert ZakatTracker.human_readable_size(1023) == "1023.00 B" 2301 2302 assert ZakatTracker.human_readable_size(1024) == "1.00 KB" 2303 assert ZakatTracker.human_readable_size(2048) == "2.00 KB" 2304 assert ZakatTracker.human_readable_size(5120) == "5.00 KB" 2305 2306 assert ZakatTracker.human_readable_size(1024 ** 2) == "1.00 MB" 2307 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2) == "2.50 MB" 2308 2309 assert ZakatTracker.human_readable_size(1024 ** 3) == "1.00 GB" 2310 assert ZakatTracker.human_readable_size(1024 ** 4) == "1.00 TB" 2311 assert ZakatTracker.human_readable_size(1024 ** 5) == "1.00 PB" 2312 2313 assert ZakatTracker.human_readable_size(1536, decimal_places=0) == "2 KB" 2314 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2, decimal_places=1) == "2.5 MB" 2315 assert ZakatTracker.human_readable_size(1234567890, decimal_places=3) == "1.150 GB" 2316 2317 try: 2318 # noinspection PyTypeChecker 2319 ZakatTracker.human_readable_size("not a number") 2320 assert False, "Expected TypeError for invalid input" 2321 except TypeError: 2322 pass 2323 2324 try: 2325 # noinspection PyTypeChecker 2326 ZakatTracker.human_readable_size(1024, decimal_places="not an int") 2327 assert False, "Expected TypeError for invalid decimal_places" 2328 except TypeError: 2329 pass 2330 2331 # get_dict_size 2332 assert ZakatTracker.get_dict_size({}) == sys.getsizeof({}), "Empty dictionary size mismatch" 2333 assert ZakatTracker.get_dict_size({"a": 1, "b": 2.5, "c": True}) != sys.getsizeof({}), "Not Empty dictionary" 2334 2335 # number scale 2336 error = 0 2337 total = 0 2338 for sign in ['', '-']: 2339 for max_i, max_j, decimal_places in [ 2340 (101, 101, 2), # fiat currency minimum unit took 2 decimal places 2341 (1, 1_000, 8), # cryptocurrency like Satoshi in Bitcoin took 8 decimal places 2342 (1, 1_000, 18) # cryptocurrency like Wei in Ethereum took 18 decimal places 2343 ]: 2344 for return_type in ( 2345 float, 2346 Decimal, 2347 ): 2348 for i in range(max_i): 2349 for j in range(max_j): 2350 total += 1 2351 num_str = f'{sign}{i}.{j:0{decimal_places}d}' 2352 num = return_type(num_str) 2353 scaled = self.scale(num, decimal_places=decimal_places) 2354 unscaled = self.unscale(scaled, return_type=return_type, decimal_places=decimal_places) 2355 if debug: 2356 print( 2357 f'return_type: {return_type}, num_str: {num_str} - num: {num} - scaled: {scaled} - unscaled: {unscaled}') 2358 if unscaled != num: 2359 if debug: 2360 print('***** SCALE ERROR *****') 2361 error += 1 2362 if debug: 2363 print(f'total: {total}, error({error}): {100 * error / total}%') 2364 assert error == 0 2365 2366 assert self.nolock() 2367 assert self._history() is True 2368 2369 table = { 2370 1: [ 2371 (0, 10, 1000, 1000, 1000, 1, 1), 2372 (0, 20, 3000, 3000, 3000, 2, 2), 2373 (0, 30, 6000, 6000, 6000, 3, 3), 2374 (1, 15, 4500, 4500, 4500, 3, 4), 2375 (1, 50, -500, -500, -500, 4, 5), 2376 (1, 100, -10500, -10500, -10500, 5, 6), 2377 ], 2378 'wallet': [ 2379 (1, 90, -9000, -9000, -9000, 1, 1), 2380 (0, 100, 1000, 1000, 1000, 2, 2), 2381 (1, 190, -18000, -18000, -18000, 3, 3), 2382 (0, 1000, 82000, 82000, 82000, 4, 4), 2383 ], 2384 } 2385 for x in table: 2386 for y in table[x]: 2387 self.lock() 2388 if y[0] == 0: 2389 ref = self.track( 2390 unscaled_value=y[1], 2391 desc='test-add', 2392 account=x, 2393 logging=True, 2394 created=ZakatTracker.time(), 2395 debug=debug, 2396 ) 2397 else: 2398 (ref, z) = self.sub( 2399 unscaled_value=y[1], 2400 desc='test-sub', 2401 account=x, 2402 created=ZakatTracker.time(), 2403 ) 2404 if debug: 2405 print('_sub', z, ZakatTracker.time()) 2406 assert ref != 0 2407 assert len(self._vault['account'][x]['log'][ref]['file']) == 0 2408 for i in range(3): 2409 file_ref = self.add_file(x, ref, 'file_' + str(i)) 2410 sleep(0.0000001) 2411 assert file_ref != 0 2412 if debug: 2413 print('ref', ref, 'file', file_ref) 2414 assert len(self._vault['account'][x]['log'][ref]['file']) == i + 1 2415 file_ref = self.add_file(x, ref, 'file_' + str(3)) 2416 assert self.remove_file(x, ref, file_ref) 2417 z = self.balance(x) 2418 if debug: 2419 print("debug-0", z, y) 2420 assert z == y[2] 2421 z = self.balance(x, False) 2422 if debug: 2423 print("debug-1", z, y[3]) 2424 assert z == y[3] 2425 o = self._vault['account'][x]['log'] 2426 z = 0 2427 for i in o: 2428 z += o[i]['value'] 2429 if debug: 2430 print("debug-2", z, type(z)) 2431 print("debug-2", y[4], type(y[4])) 2432 assert z == y[4] 2433 if debug: 2434 print('debug-2 - PASSED') 2435 assert self.box_size(x) == y[5] 2436 assert self.log_size(x) == y[6] 2437 assert not self.nolock() 2438 self.free(self.lock()) 2439 assert self.nolock() 2440 assert self.boxes(x) != {} 2441 assert self.logs(x) != {} 2442 2443 assert not self.hide(x) 2444 assert self.hide(x, False) is False 2445 assert self.hide(x) is False 2446 assert self.hide(x, True) 2447 assert self.hide(x) 2448 2449 assert self.zakatable(x) 2450 assert self.zakatable(x, False) is False 2451 assert self.zakatable(x) is False 2452 assert self.zakatable(x, True) 2453 assert self.zakatable(x) 2454 2455 if restore is True: 2456 count = len(self._vault['history']) 2457 if debug: 2458 print('history-count', count) 2459 assert count == 10 2460 # try mode 2461 for _ in range(count): 2462 assert self.recall(True, debug) 2463 count = len(self._vault['history']) 2464 if debug: 2465 print('history-count', count) 2466 assert count == 10 2467 _accounts = list(table.keys()) 2468 accounts_limit = len(_accounts) + 1 2469 for i in range(-1, -accounts_limit, -1): 2470 account = _accounts[i] 2471 if debug: 2472 print(account, len(table[account])) 2473 transaction_limit = len(table[account]) + 1 2474 for j in range(-1, -transaction_limit, -1): 2475 row = table[account][j] 2476 if debug: 2477 print(row, self.balance(account), self.balance(account, False)) 2478 assert self.balance(account) == self.balance(account, False) 2479 assert self.balance(account) == row[2] 2480 assert self.recall(False, debug) 2481 assert self.recall(False, debug) is False 2482 count = len(self._vault['history']) 2483 if debug: 2484 print('history-count', count) 2485 assert count == 0 2486 self.reset() 2487 2488 def test(self, debug: bool = False) -> bool: 2489 if debug: 2490 print('test', f'debug={debug}') 2491 try: 2492 2493 self._test_core(True, debug) 2494 self._test_core(False, debug) 2495 2496 assert self._history() 2497 2498 # Not allowed for duplicate transactions in the same account and time 2499 2500 created = ZakatTracker.time() 2501 self.track(100, 'test-1', 'same', True, created) 2502 failed = False 2503 try: 2504 self.track(50, 'test-1', 'same', True, created) 2505 except: 2506 failed = True 2507 assert failed is True 2508 2509 self.reset() 2510 2511 # Same account transfer 2512 for x in [1, 'a', True, 1.8, None]: 2513 failed = False 2514 try: 2515 self.transfer(1, x, x, 'same-account', debug=debug) 2516 except: 2517 failed = True 2518 assert failed is True 2519 2520 # Always preserve box age during transfer 2521 2522 series: list[tuple] = [ 2523 (30, 4), 2524 (60, 3), 2525 (90, 2), 2526 ] 2527 case = { 2528 3000: { 2529 'series': series, 2530 'rest': 15000, 2531 }, 2532 6000: { 2533 'series': series, 2534 'rest': 12000, 2535 }, 2536 9000: { 2537 'series': series, 2538 'rest': 9000, 2539 }, 2540 18000: { 2541 'series': series, 2542 'rest': 0, 2543 }, 2544 27000: { 2545 'series': series, 2546 'rest': -9000, 2547 }, 2548 36000: { 2549 'series': series, 2550 'rest': -18000, 2551 }, 2552 } 2553 2554 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2555 2556 for total in case: 2557 if debug: 2558 print('--------------------------------------------------------') 2559 print(f'case[{total}]', case[total]) 2560 for x in case[total]['series']: 2561 self.track( 2562 unscaled_value=x[0], 2563 desc=f"test-{x} ages", 2564 account='ages', 2565 logging=True, 2566 created=selected_time * x[1], 2567 ) 2568 2569 unscaled_total = self.unscale(total) 2570 if debug: 2571 print('unscaled_total', unscaled_total) 2572 refs = self.transfer( 2573 unscaled_amount=unscaled_total, 2574 from_account='ages', 2575 to_account='future', 2576 desc='Zakat Movement', 2577 debug=debug, 2578 ) 2579 2580 if debug: 2581 print('refs', refs) 2582 2583 ages_cache_balance = self.balance('ages') 2584 ages_fresh_balance = self.balance('ages', False) 2585 rest = case[total]['rest'] 2586 if debug: 2587 print('source', ages_cache_balance, ages_fresh_balance, rest) 2588 assert ages_cache_balance == rest 2589 assert ages_fresh_balance == rest 2590 2591 future_cache_balance = self.balance('future') 2592 future_fresh_balance = self.balance('future', False) 2593 if debug: 2594 print('target', future_cache_balance, future_fresh_balance, total) 2595 print('refs', refs) 2596 assert future_cache_balance == total 2597 assert future_fresh_balance == total 2598 2599 # TODO: check boxes times for `ages` should equal box times in `future` 2600 for ref in self._vault['account']['ages']['box']: 2601 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2602 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2603 future_capital = 0 2604 if ref in self._vault['account']['future']['box']: 2605 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2606 future_rest = 0 2607 if ref in self._vault['account']['future']['box']: 2608 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2609 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2610 if debug: 2611 print('================================================================') 2612 print('ages', ages_capital, ages_rest) 2613 print('future', future_capital, future_rest) 2614 if ages_rest == 0: 2615 assert ages_capital == future_capital 2616 elif ages_rest < 0: 2617 assert -ages_capital == future_capital 2618 elif ages_rest > 0: 2619 assert ages_capital == ages_rest + future_capital 2620 self.reset() 2621 assert len(self._vault['history']) == 0 2622 2623 assert self._history() 2624 assert self._history(False) is False 2625 assert self._history() is False 2626 assert self._history(True) 2627 assert self._history() 2628 if debug: 2629 print('####################################################################') 2630 2631 transaction = [ 2632 ( 2633 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2634 2000, 2000, 2000, 1, 1, 2635 ), 2636 ( 2637 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2638 75000, 75000, 75000, 1, 1, 2639 ), 2640 ( 2641 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2642 60000, 60000, 60000, 1, 1, 2643 ), 2644 ] 2645 for z in transaction: 2646 self.lock() 2647 x = z[1] 2648 y = z[2] 2649 self.transfer( 2650 unscaled_amount=z[0], 2651 from_account=x, 2652 to_account=y, 2653 desc='test-transfer', 2654 debug=debug, 2655 ) 2656 zz = self.balance(x) 2657 if debug: 2658 print(zz, z) 2659 assert zz == z[3] 2660 xx = self.accounts()[x] 2661 assert xx == z[3] 2662 assert self.balance(x, False) == z[4] 2663 assert xx == z[4] 2664 2665 s = 0 2666 log = self._vault['account'][x]['log'] 2667 for i in log: 2668 s += log[i]['value'] 2669 if debug: 2670 print('s', s, 'z[5]', z[5]) 2671 assert s == z[5] 2672 2673 assert self.box_size(x) == z[6] 2674 assert self.log_size(x) == z[7] 2675 2676 yy = self.accounts()[y] 2677 assert self.balance(y) == z[8] 2678 assert yy == z[8] 2679 assert self.balance(y, False) == z[9] 2680 assert yy == z[9] 2681 2682 s = 0 2683 log = self._vault['account'][y]['log'] 2684 for i in log: 2685 s += log[i]['value'] 2686 assert s == z[10] 2687 2688 assert self.box_size(y) == z[11] 2689 assert self.log_size(y) == z[12] 2690 assert self.free(self.lock()) 2691 2692 if debug: 2693 pp().pprint(self.check(2.17)) 2694 2695 assert not self.nolock() 2696 history_count = len(self._vault['history']) 2697 if debug: 2698 print('history-count', history_count) 2699 assert history_count == 4 2700 assert not self.free(ZakatTracker.time()) 2701 assert self.free(self.lock()) 2702 assert self.nolock() 2703 assert len(self._vault['history']) == 3 2704 2705 # storage 2706 2707 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 2708 if os.path.exists(_path): 2709 os.remove(_path) 2710 self.save() 2711 assert os.path.getsize(_path) > 0 2712 self.reset() 2713 assert self.recall(False, debug) is False 2714 self.load() 2715 assert self._vault['account'] is not None 2716 2717 # recall 2718 2719 assert self.nolock() 2720 assert len(self._vault['history']) == 3 2721 assert self.recall(False, debug) is True 2722 assert len(self._vault['history']) == 2 2723 assert self.recall(False, debug) is True 2724 assert len(self._vault['history']) == 1 2725 assert self.recall(False, debug) is True 2726 assert len(self._vault['history']) == 0 2727 assert self.recall(False, debug) is False 2728 assert len(self._vault['history']) == 0 2729 2730 # exchange 2731 2732 self.exchange("cash", 25, 3.75, "2024-06-25") 2733 self.exchange("cash", 22, 3.73, "2024-06-22") 2734 self.exchange("cash", 15, 3.69, "2024-06-15") 2735 self.exchange("cash", 10, 3.66) 2736 2737 for i in range(1, 30): 2738 exchange = self.exchange("cash", i) 2739 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2740 if debug: 2741 print(i, rate, description, created) 2742 assert created 2743 if i < 10: 2744 assert rate == 1 2745 assert description is None 2746 elif i == 10: 2747 assert rate == 3.66 2748 assert description is None 2749 elif i < 15: 2750 assert rate == 3.66 2751 assert description is None 2752 elif i == 15: 2753 assert rate == 3.69 2754 assert description is not None 2755 elif i < 22: 2756 assert rate == 3.69 2757 assert description is not None 2758 elif i == 22: 2759 assert rate == 3.73 2760 assert description is not None 2761 elif i >= 25: 2762 assert rate == 3.75 2763 assert description is not None 2764 exchange = self.exchange("bank", i) 2765 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2766 if debug: 2767 print(i, rate, description, created) 2768 assert created 2769 assert rate == 1 2770 assert description is None 2771 2772 assert len(self._vault['exchange']) > 0 2773 assert len(self.exchanges()) > 0 2774 self._vault['exchange'].clear() 2775 assert len(self._vault['exchange']) == 0 2776 assert len(self.exchanges()) == 0 2777 2778 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2779 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2780 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2781 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2782 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2783 2784 for i in [x * 0.12 for x in range(-15, 21)]: 2785 if i <= 0: 2786 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2787 else: 2788 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2789 2790 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2791 for i in range(1, 31): 2792 timestamp_ns = ZakatTracker.day_to_time(i) 2793 exchange = self.exchange("cash", timestamp_ns) 2794 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2795 if debug: 2796 print(i, rate, description, created) 2797 assert created 2798 if i < 10: 2799 assert rate == 1 2800 assert description is None 2801 elif i == 10: 2802 assert rate == 3.66 2803 assert description is None 2804 elif i < 15: 2805 assert rate == 3.66 2806 assert description is None 2807 elif i == 15: 2808 assert rate == 3.69 2809 assert description is not None 2810 elif i < 22: 2811 assert rate == 3.69 2812 assert description is not None 2813 elif i == 22: 2814 assert rate == 3.73 2815 assert description is not None 2816 elif i >= 25: 2817 assert rate == 3.75 2818 assert description is not None 2819 exchange = self.exchange("bank", i) 2820 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2821 if debug: 2822 print(i, rate, description, created) 2823 assert created 2824 assert rate == 1 2825 assert description is None 2826 2827 # csv 2828 2829 csv_count = 1000 2830 2831 for with_rate, path in { 2832 False: 'test-import_csv-no-exchange', 2833 True: 'test-import_csv-with-exchange', 2834 }.items(): 2835 2836 if debug: 2837 print('test_import_csv', with_rate, path) 2838 2839 csv_path = path + '.csv' 2840 if os.path.exists(csv_path): 2841 os.remove(csv_path) 2842 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2843 if debug: 2844 print('generate_random_csv_file', c) 2845 assert c == csv_count 2846 assert os.path.getsize(csv_path) > 0 2847 cache_path = self.import_csv_cache_path() 2848 if os.path.exists(cache_path): 2849 os.remove(cache_path) 2850 self.reset() 2851 (created, found, bad) = self.import_csv(csv_path, debug) 2852 bad_count = len(bad) 2853 assert bad_count > 0 2854 if debug: 2855 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2856 print('bad', bad) 2857 tmp_size = os.path.getsize(cache_path) 2858 assert tmp_size > 0 2859 # TODO: assert created + found + bad_count == csv_count 2860 # TODO: assert created == csv_count 2861 # TODO: assert bad_count == 0 2862 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2863 bad_2_count = len(bad_2) 2864 if debug: 2865 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2866 print('bad', bad) 2867 assert bad_2_count > 0 2868 # TODO: assert tmp_size == os.path.getsize(cache_path) 2869 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2870 # TODO: assert created == found_2 2871 # TODO: assert bad_count == bad_2_count 2872 # TODO: assert found_2 == csv_count 2873 # TODO: assert bad_2_count == 0 2874 # TODO: assert created_2 == 0 2875 2876 # payment parts 2877 2878 positive_parts = self.build_payment_parts(100, positive_only=True) 2879 assert self.check_payment_parts(positive_parts) != 0 2880 assert self.check_payment_parts(positive_parts) != 0 2881 all_parts = self.build_payment_parts(300, positive_only=False) 2882 assert self.check_payment_parts(all_parts) != 0 2883 assert self.check_payment_parts(all_parts) != 0 2884 if debug: 2885 pp().pprint(positive_parts) 2886 pp().pprint(all_parts) 2887 # dynamic discount 2888 suite = [] 2889 count = 3 2890 for exceed in [False, True]: 2891 case = [] 2892 for parts in [positive_parts, all_parts]: 2893 part = parts.copy() 2894 demand = part['demand'] 2895 if debug: 2896 print(demand, part['total']) 2897 i = 0 2898 z = demand / count 2899 cp = { 2900 'account': {}, 2901 'demand': demand, 2902 'exceed': exceed, 2903 'total': part['total'], 2904 } 2905 j = '' 2906 for x, y in part['account'].items(): 2907 x_exchange = self.exchange(x) 2908 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2909 if exceed and zz <= demand: 2910 i += 1 2911 y['part'] = zz 2912 if debug: 2913 print(exceed, y) 2914 cp['account'][x] = y 2915 case.append(y) 2916 elif not exceed and y['balance'] >= zz: 2917 i += 1 2918 y['part'] = zz 2919 if debug: 2920 print(exceed, y) 2921 cp['account'][x] = y 2922 case.append(y) 2923 j = x 2924 if i >= count: 2925 break 2926 if len(cp['account'][j]) > 0: 2927 suite.append(cp) 2928 if debug: 2929 print('suite', len(suite)) 2930 # vault = self._vault.copy() 2931 for case in suite: 2932 # self._vault = vault.copy() 2933 if debug: 2934 print('case', case) 2935 result = self.check_payment_parts(case) 2936 if debug: 2937 print('check_payment_parts', result, f'exceed: {exceed}') 2938 assert result == 0 2939 2940 report = self.check(2.17, None, debug) 2941 (valid, brief, plan) = report 2942 if debug: 2943 print('valid', valid) 2944 zakat_result = self.zakat(report, parts=case, debug=debug) 2945 if debug: 2946 print('zakat-result', zakat_result) 2947 assert valid == zakat_result 2948 2949 assert self.save(path + f'.{self.ext()}') 2950 assert self.export_json(path + '.json') 2951 2952 assert self.export_json("1000-transactions-test.json") 2953 assert self.save(f"1000-transactions-test.{self.ext()}") 2954 2955 self.reset() 2956 2957 # test transfer between accounts with different exchange rate 2958 2959 a_SAR = "Bank (SAR)" 2960 b_USD = "Bank (USD)" 2961 c_SAR = "Safe (SAR)" 2962 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2963 for case in [ 2964 (0, a_SAR, "SAR Gift", 1000, 100000), 2965 (1, a_SAR, 1), 2966 (0, b_USD, "USD Gift", 500, 50000), 2967 (1, b_USD, 1), 2968 (2, b_USD, 3.75), 2969 (1, b_USD, 3.75), 2970 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2971 (0, c_SAR, "Salary", 750, 75000), 2972 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2973 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2974 ]: 2975 if debug: 2976 print('case', case) 2977 match (case[0]): 2978 case 0: # track 2979 _, account, desc, x, balance = case 2980 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2981 2982 cached_value = self.balance(account, cached=True) 2983 fresh_value = self.balance(account, cached=False) 2984 if debug: 2985 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2986 assert cached_value == balance 2987 assert fresh_value == balance 2988 case 1: # check-exchange 2989 _, account, expected_rate = case 2990 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2991 if debug: 2992 print('t-exchange', t_exchange) 2993 assert t_exchange['rate'] == expected_rate 2994 case 2: # do-exchange 2995 _, account, rate = case 2996 self.exchange(account, rate=rate, debug=debug) 2997 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2998 if debug: 2999 print('b-exchange', b_exchange) 3000 assert b_exchange['rate'] == rate 3001 case 3: # transfer 3002 _, x, a, b, desc, a_balance, b_balance = case 3003 self.transfer(x, a, b, desc, debug=debug) 3004 3005 cached_value = self.balance(a, cached=True) 3006 fresh_value = self.balance(a, cached=False) 3007 if debug: 3008 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 3009 assert cached_value == a_balance 3010 assert fresh_value == a_balance 3011 3012 cached_value = self.balance(b, cached=True) 3013 fresh_value = self.balance(b, cached=False) 3014 if debug: 3015 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3016 assert cached_value == b_balance 3017 assert fresh_value == b_balance 3018 3019 # Transfer all in many chunks randomly from B to A 3020 a_SAR_balance = 137125 3021 b_USD_balance = 50100 3022 b_USD_exchange = self.exchange(b_USD) 3023 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3024 if debug: 3025 print('amounts', amounts) 3026 i = 0 3027 for x in amounts: 3028 if debug: 3029 print(f'{i} - transfer-with-exchange({x})') 3030 self.transfer( 3031 unscaled_amount=self.unscale(x), 3032 from_account=b_USD, 3033 to_account=a_SAR, 3034 desc=f"{x} USD -> SAR", 3035 debug=debug, 3036 ) 3037 3038 b_USD_balance -= x 3039 cached_value = self.balance(b_USD, cached=True) 3040 fresh_value = self.balance(b_USD, cached=False) 3041 if debug: 3042 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3043 b_USD_balance) 3044 assert cached_value == b_USD_balance 3045 assert fresh_value == b_USD_balance 3046 3047 a_SAR_balance += int(x * b_USD_exchange['rate']) 3048 cached_value = self.balance(a_SAR, cached=True) 3049 fresh_value = self.balance(a_SAR, cached=False) 3050 if debug: 3051 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3052 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3053 assert cached_value == a_SAR_balance 3054 assert fresh_value == a_SAR_balance 3055 i += 1 3056 3057 # Transfer all in many chunks randomly from C to A 3058 c_SAR_balance = 37500 3059 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3060 if debug: 3061 print('amounts', amounts) 3062 i = 0 3063 for x in amounts: 3064 if debug: 3065 print(f'{i} - transfer-with-exchange({x})') 3066 self.transfer( 3067 unscaled_amount=self.unscale(x), 3068 from_account=c_SAR, 3069 to_account=a_SAR, 3070 desc=f"{x} SAR -> a_SAR", 3071 debug=debug, 3072 ) 3073 3074 c_SAR_balance -= x 3075 cached_value = self.balance(c_SAR, cached=True) 3076 fresh_value = self.balance(c_SAR, cached=False) 3077 if debug: 3078 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3079 c_SAR_balance) 3080 assert cached_value == c_SAR_balance 3081 assert fresh_value == c_SAR_balance 3082 3083 a_SAR_balance += x 3084 cached_value = self.balance(a_SAR, cached=True) 3085 fresh_value = self.balance(a_SAR, cached=False) 3086 if debug: 3087 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3088 a_SAR_balance) 3089 assert cached_value == a_SAR_balance 3090 assert fresh_value == a_SAR_balance 3091 i += 1 3092 3093 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3094 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3095 3096 # check & zakat with exchange rates for many cycles 3097 3098 for rate, values in { 3099 1: { 3100 'in': [1000, 2000, 10000], 3101 'exchanged': [100000, 200000, 1000000], 3102 'out': [2500, 5000, 73140], 3103 }, 3104 3.75: { 3105 'in': [200, 1000, 5000], 3106 'exchanged': [75000, 375000, 1875000], 3107 'out': [1875, 9375, 137138], 3108 }, 3109 }.items(): 3110 a, b, c = values['in'] 3111 m, n, o = values['exchanged'] 3112 x, y, z = values['out'] 3113 if debug: 3114 print('rate', rate, 'values', values) 3115 for case in [ 3116 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3117 {'safe': {0: {'below_nisab': x}}}, 3118 ], False, m), 3119 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3120 {'safe': {0: {'count': 1, 'total': y}}}, 3121 ], True, n), 3122 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3123 {'cave': {0: {'count': 3, 'total': z}}}, 3124 ], True, o), 3125 ]: 3126 if debug: 3127 print(f"############# check(rate: {rate}) #############") 3128 print('case', case) 3129 self.reset() 3130 self.exchange(account=case[1], created=case[2], rate=rate) 3131 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3132 assert self.snapshot() 3133 3134 # assert self.nolock() 3135 # history_size = len(self._vault['history']) 3136 # print('history_size', history_size) 3137 # assert history_size == 2 3138 assert self.lock() 3139 assert not self.nolock() 3140 report = self.check(2.17, None, debug) 3141 (valid, brief, plan) = report 3142 if debug: 3143 print('brief', brief) 3144 assert valid == case[4] 3145 assert case[5] == brief[0] 3146 assert case[5] == brief[1] 3147 3148 if debug: 3149 pp().pprint(plan) 3150 3151 for x in plan: 3152 assert case[1] == x 3153 if 'total' in case[3][0][x][0].keys(): 3154 assert case[3][0][x][0]['total'] == int(brief[2]) 3155 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3156 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3157 else: 3158 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3159 if debug: 3160 pp().pprint(report) 3161 result = self.zakat(report, debug=debug) 3162 if debug: 3163 print('zakat-result', result, case[4]) 3164 assert result == case[4] 3165 report = self.check(2.17, None, debug) 3166 (valid, brief, plan) = report 3167 assert valid is False 3168 3169 history_size = len(self._vault['history']) 3170 if debug: 3171 print('history_size', history_size) 3172 assert history_size == 3 3173 assert not self.nolock() 3174 assert self.recall(False, debug) is False 3175 self.free(self.lock()) 3176 assert self.nolock() 3177 3178 for i in range(3, 0, -1): 3179 history_size = len(self._vault['history']) 3180 if debug: 3181 print('history_size', history_size) 3182 assert history_size == i 3183 assert self.recall(False, debug) is True 3184 3185 assert self.nolock() 3186 assert self.recall(False, debug) is False 3187 3188 history_size = len(self._vault['history']) 3189 if debug: 3190 print('history_size', history_size) 3191 assert history_size == 0 3192 3193 account_size = len(self._vault['account']) 3194 if debug: 3195 print('account_size', account_size) 3196 assert account_size == 0 3197 3198 report_size = len(self._vault['report']) 3199 if debug: 3200 print('report_size', report_size) 3201 assert report_size == 0 3202 3203 assert self.nolock() 3204 return True 3205 except Exception as e: 3206 # pp().pprint(self._vault) 3207 assert self.export_json("test-snapshot.json") 3208 assert self.save(f"test-snapshot.{self.ext()}") 3209 raise e
A class for tracking and calculating Zakat.
This class provides functionalities for recording transactions, calculating Zakat due, and managing account balances. It also offers features like importing transactions from CSV files, exporting data to JSON format, and saving/loading the tracker state.
The ZakatTracker
class is designed to handle both positive and negative transactions,
allowing for flexible tracking of financial activities related to Zakat. It also supports
the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due
based on the current silver price.
The class uses a camel file as its database to persist the tracker state, ensuring data integrity across sessions. It also provides options for enabling or disabling history tracking, allowing users to choose their preferred level of detail.
In addition, the ZakatTracker
class includes various helper methods like
time
, time_to_datetime
, lock
, free
, recall
, export_json
,
and more. These methods provide additional functionalities and flexibility
for interacting with and managing the Zakat tracker.
Attributes: ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. ZakatTracker.Version (function): The version of the ZakatTracker class.
Data Structure: The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data.
_vault (dict):
- account (dict):
- {account_number} (dict):
- balance (int): The current balance of the account.
- box (dict): A dictionary storing transaction details.
- {timestamp} (dict):
- capital (int): The initial amount of the transaction.
- count (int): The number of times Zakat has been calculated for this transaction.
- last (int): The timestamp of the last Zakat calculation.
- rest (int): The remaining amount after Zakat deductions and withdrawal.
- total (int): The total Zakat deducted from this transaction.
- count (int): The total number of transactions for the account.
- log (dict): A dictionary storing transaction logs.
- {timestamp} (dict):
- value (int): The transaction amount (positive or negative).
- desc (str): The description of the transaction.
- ref (int): The box reference (positive or None).
- file (dict): A dictionary storing file references associated with the transaction.
- hide (bool): Indicates whether the account is hidden or not.
- zakatable (bool): Indicates whether the account is subject to Zakat.
- exchange (dict):
- account (dict):
- {timestamps} (dict):
- rate (float): Exchange rate when compared to local currency.
- description (str): The description of the exchange rate.
- history (dict):
- {timestamp} (list): A list of dictionaries storing the history of actions performed.
- {action_dict} (dict):
- action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT).
- account (str): The account number associated with the action.
- ref (int): The reference number of the transaction.
- file (int): The reference number of the file (if applicable).
- key (str): The key associated with the action (e.g., 'rest', 'total').
- value (int): The value associated with the action.
- math (MathOperation): The mathematical operation performed (if applicable).
- lock (int or None): The timestamp indicating the current lock status (None if not locked).
- report (dict):
- {timestamp} (tuple): A tuple storing Zakat report details.
270 def __init__(self, db_path: str = "./zakat_db/zakat.camel", history_mode: bool = True): 271 """ 272 Initialize ZakatTracker with database path and history mode. 273 274 Parameters: 275 db_path (str): The path to the database file. Default is "zakat.camel". 276 history_mode (bool): The mode for tracking history. Default is True. 277 278 Returns: 279 None 280 """ 281 self._base_path = None 282 self._vault_path = None 283 self._vault = None 284 self.reset() 285 self._history(history_mode) 286 self.path(db_path)
Initialize ZakatTracker with database path and history mode.
Parameters: db_path (str): The path to the database file. Default is "zakat.camel". history_mode (bool): The mode for tracking history. Default is True.
Returns: None
194 @staticmethod 195 def Version() -> str: 196 """ 197 Returns the current version of the software. 198 199 This function returns a string representing the current version of the software, 200 including major, minor, and patch version numbers in the format "X.Y.Z". 201 202 Returns: 203 str: The current version of the software. 204 """ 205 return '0.2.88'
Returns the current version of the software.
This function returns a string representing the current version of the software, including major, minor, and patch version numbers in the format "X.Y.Z".
Returns: str: The current version of the software.
207 @staticmethod 208 def ZakatCut(x: float) -> float: 209 """ 210 Calculates the Zakat amount due on an asset. 211 212 This function calculates the zakat amount due on a given asset value over one lunar year. 213 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 214 that exceeds a certain threshold (Nisab). 215 216 Parameters: 217 x: The total value of the asset on which Zakat is to be calculated. 218 219 Returns: 220 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 221 """ 222 return 0.025 * x # Zakat Cut in one Lunar Year
Calculates the Zakat amount due on an asset.
This function calculates the zakat amount due on a given asset value over one lunar year. Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth that exceeds a certain threshold (Nisab).
Parameters: x: The total value of the asset on which Zakat is to be calculated.
Returns: The amount of Zakat due on the asset, calculated as 2.5% of the asset's value.
224 @staticmethod 225 def TimeCycle(days: int = 355) -> int: 226 """ 227 Calculates the approximate duration of a lunar year in nanoseconds. 228 229 This function calculates the approximate duration of a lunar year based on the given number of days. 230 It converts the given number of days into nanoseconds for use in high-precision timing applications. 231 232 Parameters: 233 days: The number of days in a lunar year. Defaults to 355, 234 which is an approximation of the average length of a lunar year. 235 236 Returns: 237 The approximate duration of a lunar year in nanoseconds. 238 """ 239 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds
Calculates the approximate duration of a lunar year in nanoseconds.
This function calculates the approximate duration of a lunar year based on the given number of days. It converts the given number of days into nanoseconds for use in high-precision timing applications.
Parameters: days: The number of days in a lunar year. Defaults to 355, which is an approximation of the average length of a lunar year.
Returns: The approximate duration of a lunar year in nanoseconds.
241 @staticmethod 242 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 243 """ 244 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 245 246 This function calculates the Nisab value, which is the minimum threshold of wealth, 247 that makes an individual liable for paying Zakat. 248 The Nisab value is determined by the equivalent value of a specific amount 249 of gold or silver (currently 595 grams in silver) in the local currency. 250 251 Parameters: 252 - gram_price (float): The price per gram of Nisab. 253 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 254 255 Returns: 256 - float: The total value of Nisab based on the given price per gram. 257 """ 258 return gram_price * gram_quantity
Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram.
This function calculates the Nisab value, which is the minimum threshold of wealth, that makes an individual liable for paying Zakat. The Nisab value is determined by the equivalent value of a specific amount of gold or silver (currently 595 grams in silver) in the local currency.
Parameters:
- gram_price (float): The price per gram of Nisab.
- gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver.
Returns:
- float: The total value of Nisab based on the given price per gram.
260 @staticmethod 261 def ext() -> str: 262 """ 263 Returns the file extension used by the ZakatTracker class. 264 265 Returns: 266 str: The file extension used by the ZakatTracker class, which is 'camel'. 267 """ 268 return 'camel'
Returns the file extension used by the ZakatTracker class.
Returns: str: The file extension used by the ZakatTracker class, which is 'camel'.
288 def path(self, path: str = None) -> str: 289 """ 290 Set or get the path to the database file. 291 292 If no path is provided, the current path is returned. 293 If a path is provided, it is set as the new path. 294 The function also creates the necessary directories if the provided path is a file. 295 296 Parameters: 297 path (str): The new path to the database file. If not provided, the current path is returned. 298 299 Returns: 300 str: The current or new path to the database file. 301 """ 302 if path is None: 303 return self._vault_path 304 self._vault_path = Path(path).resolve() 305 base_path = Path(path).resolve() 306 if base_path.is_file() or base_path.suffix: 307 base_path = base_path.parent 308 base_path.mkdir(parents=True, exist_ok=True) 309 self._base_path = base_path 310 return str(self._vault_path)
Set or get the path to the database file.
If no path is provided, the current path is returned. If a path is provided, it is set as the new path. The function also creates the necessary directories if the provided path is a file.
Parameters: path (str): The new path to the database file. If not provided, the current path is returned.
Returns: str: The current or new path to the database file.
312 def base_path(self, *args) -> str: 313 """ 314 Generate a base path by joining the provided arguments with the existing base path. 315 316 Parameters: 317 *args (str): Variable length argument list of strings to be joined with the base path. 318 319 Returns: 320 str: The generated base path. If no arguments are provided, the existing base path is returned. 321 """ 322 if not args: 323 return str(self._base_path) 324 filtered_args = [] 325 ignored_filename = None 326 for arg in args: 327 if Path(arg).suffix: 328 ignored_filename = arg 329 else: 330 filtered_args.append(arg) 331 base_path = Path(self._base_path) 332 full_path = base_path.joinpath(*filtered_args) 333 full_path.mkdir(parents=True, exist_ok=True) 334 if ignored_filename is not None: 335 return full_path.resolve() / ignored_filename # Join with the ignored filename 336 return str(full_path.resolve())
Generate a base path by joining the provided arguments with the existing base path.
Parameters: *args (str): Variable length argument list of strings to be joined with the base path.
Returns: str: The generated base path. If no arguments are provided, the existing base path is returned.
338 @staticmethod 339 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 340 """ 341 Scales a numerical value by a specified power of 10, returning an integer. 342 343 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 344 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 345 346 Parameters: 347 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 348 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 349 by a factor of 100 (e.g., converts 1.23 to 123). 350 351 Returns: 352 The scaled value, rounded to the nearest integer. 353 354 Raises: 355 TypeError: If the input `x` is not a valid numeric type. 356 357 Examples: 358 >>> ZakatTracker.scale(3.14159) 359 314 360 >>> ZakatTracker.scale(1234, decimal_places=3) 361 1234000 362 >>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4) 363 50 364 """ 365 if not isinstance(x, (float, int, Decimal)): 366 raise TypeError("Input 'x' must be a float, int, or Decimal.") 367 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places))
Scales a numerical value by a specified power of 10, returning an integer.
This function is designed to handle various numeric types (float
, int
, or Decimal
) and
facilitate precise scaling operations, particularly useful in financial or scientific calculations.
Parameters: x: The numeric value to scale. Can be a floating-point number, integer, or decimal. decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled by a factor of 100 (e.g., converts 1.23 to 123).
Returns: The scaled value, rounded to the nearest integer.
Raises:
TypeError: If the input x
is not a valid numeric type.
Examples:
>>> ZakatTracker.scale(3.14159)
314
>>> ZakatTracker.scale(1234, decimal_places=3)
1234000
>>> ZakatTracker.scale(Decimal("0.005"), decimal_places=4)
50
369 @staticmethod 370 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 371 """ 372 Unscales an integer by a power of 10. 373 374 Parameters: 375 x: The integer to unscale. 376 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 377 decimal_places: The power of 10 to use. Defaults to 2. 378 379 Returns: 380 The unscaled number, converted to the specified return_type. 381 382 Raises: 383 TypeError: If the return_type is not float or Decimal. 384 """ 385 if return_type not in (float, Decimal): 386 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 387 return round(return_type(x / (10 ** decimal_places)), decimal_places)
Unscales an integer by a power of 10.
Parameters: x: The integer to unscale. return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. decimal_places: The power of 10 to use. Defaults to 2.
Returns: The unscaled number, converted to the specified return_type.
Raises: TypeError: If the return_type is not float or Decimal.
403 def reset(self) -> None: 404 """ 405 Reset the internal data structure to its initial state. 406 407 Parameters: 408 None 409 410 Returns: 411 None 412 """ 413 self._vault = { 414 'account': {}, 415 'exchange': {}, 416 'history': {}, 417 'lock': None, 418 'report': {}, 419 }
Reset the internal data structure to its initial state.
Parameters: None
Returns: None
421 @staticmethod 422 def time(now: datetime = None) -> int: 423 """ 424 Generates a timestamp based on the provided datetime object or the current datetime. 425 426 Parameters: 427 now (datetime, optional): The datetime object to generate the timestamp from. 428 If not provided, the current datetime is used. 429 430 Returns: 431 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 432 before 1970 will return in negative until 1000AD. 433 """ 434 if now is None: 435 now = datetime.datetime.now() 436 ordinal_day = now.toordinal() 437 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 438 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day)
Generates a timestamp based on the provided datetime object or the current datetime.
Parameters: now (datetime, optional): The datetime object to generate the timestamp from. If not provided, the current datetime is used.
Returns: int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), before 1970 will return in negative until 1000AD.
440 @staticmethod 441 def time_to_datetime(ordinal_ns: int) -> datetime: 442 """ 443 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 444 445 Parameters: 446 ordinal_ns (int): The ordinal number of days since 1000-01-01. 447 448 Returns: 449 datetime: The corresponding datetime object. 450 """ 451 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 452 ns_in_day = ordinal_ns % 86_400_000_000_000 453 d = datetime.datetime.fromordinal(ordinal_day) 454 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 455 return datetime.datetime.combine(d, datetime.time()) + t
Converts an ordinal number (number of days since 1000-01-01) to a datetime object.
Parameters: ordinal_ns (int): The ordinal number of days since 1000-01-01.
Returns: datetime: The corresponding datetime object.
457 def clean_history(self, lock: int | None = None) -> int: 458 """ 459 Cleans up the history of actions performed on the ZakatTracker instance. 460 461 Parameters: 462 lock (int, optional): The lock ID is used to clean up the empty history. 463 If not provided, it cleans up the empty history records for all locks. 464 465 Returns: 466 int: The number of locks cleaned up. 467 """ 468 count = 0 469 if lock in self._vault['history']: 470 if len(self._vault['history'][lock]) <= 0: 471 count += 1 472 del self._vault['history'][lock] 473 return count 474 self.free(self.lock()) 475 for lock in self._vault['history']: 476 if len(self._vault['history'][lock]) <= 0: 477 count += 1 478 del self._vault['history'][lock] 479 return count
Cleans up the history of actions performed on the ZakatTracker instance.
Parameters: lock (int, optional): The lock ID is used to clean up the empty history. If not provided, it cleans up the empty history records for all locks.
Returns: int: The number of locks cleaned up.
517 def nolock(self) -> bool: 518 """ 519 Check if the vault lock is currently not set. 520 521 Returns: 522 bool: True if the vault lock is not set, False otherwise. 523 """ 524 return self._vault['lock'] is None
Check if the vault lock is currently not set.
Returns: bool: True if the vault lock is not set, False otherwise.
526 def lock(self) -> int: 527 """ 528 Acquires a lock on the ZakatTracker instance. 529 530 Returns: 531 int: The lock ID. This ID can be used to release the lock later. 532 """ 533 return self._step()
Acquires a lock on the ZakatTracker instance.
Returns: int: The lock ID. This ID can be used to release the lock later.
535 def vault(self) -> dict: 536 """ 537 Returns a copy of the internal vault dictionary. 538 539 This method is used to retrieve the current state of the ZakatTracker object. 540 It provides a snapshot of the internal data structure, allowing for further 541 processing or analysis. 542 543 Returns: 544 dict: A copy of the internal vault dictionary. 545 """ 546 return self._vault.copy()
Returns a copy of the internal vault dictionary.
This method is used to retrieve the current state of the ZakatTracker object. It provides a snapshot of the internal data structure, allowing for further processing or analysis.
Returns: dict: A copy of the internal vault dictionary.
548 def stats(self) -> dict[str, tuple]: 549 """ 550 Calculates and returns statistics about the object's data storage. 551 552 This method determines the size of the database file on disk and the 553 size of the data currently held in RAM (likely within a dictionary). 554 Both sizes are reported in bytes and in a human-readable format 555 (e.g., KB, MB). 556 557 Returns: 558 dict[str, tuple]: A dictionary containing the following statistics: 559 560 * 'database': A tuple with two elements: 561 - The database file size in bytes (int). 562 - The database file size in human-readable format (str). 563 * 'ram': A tuple with two elements: 564 - The RAM usage (dictionary size) in bytes (int). 565 - The RAM usage in human-readable format (str). 566 567 Example: 568 >>> stats = my_object.stats() 569 >>> print(stats['database']) 570 (256000, '250.0 KB') 571 >>> print(stats['ram']) 572 (12345, '12.1 KB') 573 """ 574 ram_size = self.get_dict_size(self.vault()) 575 file_size = os.path.getsize(self.path()) 576 return { 577 'database': (file_size, self.human_readable_size(file_size)), 578 'ram': (ram_size, self.human_readable_size(ram_size)), 579 }
Calculates and returns statistics about the object's data storage.
This method determines the size of the database file on disk and the size of the data currently held in RAM (likely within a dictionary). Both sizes are reported in bytes and in a human-readable format (e.g., KB, MB).
Returns: dict[str, tuple]: A dictionary containing the following statistics:
* 'database': A tuple with two elements:
- The database file size in bytes (int).
- The database file size in human-readable format (str).
* 'ram': A tuple with two elements:
- The RAM usage (dictionary size) in bytes (int).
- The RAM usage in human-readable format (str).
Example:
>>> stats = my_object.stats()
>>> print(stats['database'])
(256000, '250.0 KB')
>>> print(stats['ram'])
(12345, '12.1 KB')
581 def files(self) -> list[dict[str, str | int]]: 582 """ 583 Retrieves information about files associated with this class. 584 585 This class method provides a standardized way to gather details about 586 files used by the class for storage, snapshots, and CSV imports. 587 588 Returns: 589 list[dict[str, str | int]]: A list of dictionaries, each containing information 590 about a specific file: 591 592 * type (str): The type of file ('database', 'snapshot', 'import_csv'). 593 * path (str): The full file path. 594 * exists (bool): Whether the file exists on the filesystem. 595 * size (int): The file size in bytes (0 if the file doesn't exist). 596 * human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB'). 597 598 Example: 599 ``` 600 file_info = MyClass.files() 601 for info in file_info: 602 print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}") 603 ``` 604 """ 605 result = [] 606 for file_type, path in { 607 'database': self.path(), 608 'snapshot': self.snapshot_cache_path(), 609 'import_csv': self.import_csv_cache_path(), 610 }.items(): 611 exists = os.path.exists(path) 612 size = os.path.getsize(path) if exists else 0 613 human_readable_size = self.human_readable_size(size) if exists else 0 614 result.append({ 615 'type': file_type, 616 'path': path, 617 'exists': exists, 618 'size': size, 619 'human_readable_size': human_readable_size, 620 }) 621 return result
Retrieves information about files associated with this class.
This class method provides a standardized way to gather details about files used by the class for storage, snapshots, and CSV imports.
Returns: list[dict[str, str | int]]: A list of dictionaries, each containing information about a specific file:
* type (str): The type of file ('database', 'snapshot', 'import_csv').
* path (str): The full file path.
* exists (bool): Whether the file exists on the filesystem.
* size (int): The file size in bytes (0 if the file doesn't exist).
* human_readable_size (str): A human-friendly representation of the file size (e.g., '10 KB', '2.5 MB').
Example:
file_info = MyClass.files()
for info in file_info:
print(f"Type: {info['type']}, Exists: {info['exists']}, Size: {info['human_readable_size']}")
623 def steps(self) -> dict: 624 """ 625 Returns a copy of the history of steps taken in the ZakatTracker. 626 627 The history is a dictionary where each key is a unique identifier for a step, 628 and the corresponding value is a dictionary containing information about the step. 629 630 Returns: 631 dict: A copy of the history of steps taken in the ZakatTracker. 632 """ 633 return self._vault['history'].copy()
Returns a copy of the history of steps taken in the ZakatTracker.
The history is a dictionary where each key is a unique identifier for a step, and the corresponding value is a dictionary containing information about the step.
Returns: dict: A copy of the history of steps taken in the ZakatTracker.
635 def free(self, lock: int, auto_save: bool = True) -> bool: 636 """ 637 Releases the lock on the database. 638 639 Parameters: 640 lock (int): The lock ID to be released. 641 auto_save (bool): Whether to automatically save the database after releasing the lock. 642 643 Returns: 644 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 645 """ 646 if lock == self._vault['lock']: 647 self._vault['lock'] = None 648 self.clean_history(lock) 649 if auto_save: 650 return self.save(self.path()) 651 return True 652 return False
Releases the lock on the database.
Parameters: lock (int): The lock ID to be released. auto_save (bool): Whether to automatically save the database after releasing the lock.
Returns: bool: True if the lock is successfully released and (optionally) saved, False otherwise.
654 def account_exists(self, account) -> bool: 655 """ 656 Check if the given account exists in the vault. 657 658 Parameters: 659 account (str): The account number to check. 660 661 Returns: 662 bool: True if the account exists, False otherwise. 663 """ 664 return account in self._vault['account']
Check if the given account exists in the vault.
Parameters: account (str): The account number to check.
Returns: bool: True if the account exists, False otherwise.
666 def box_size(self, account) -> int: 667 """ 668 Calculate the size of the box for a specific account. 669 670 Parameters: 671 account (str): The account number for which the box size needs to be calculated. 672 673 Returns: 674 int: The size of the box for the given account. If the account does not exist, -1 is returned. 675 """ 676 if self.account_exists(account): 677 return len(self._vault['account'][account]['box']) 678 return -1
Calculate the size of the box for a specific account.
Parameters: account (str): The account number for which the box size needs to be calculated.
Returns: int: The size of the box for the given account. If the account does not exist, -1 is returned.
680 def log_size(self, account) -> int: 681 """ 682 Get the size of the log for a specific account. 683 684 Parameters: 685 account (str): The account number for which the log size needs to be calculated. 686 687 Returns: 688 int: The size of the log for the given account. If the account does not exist, -1 is returned. 689 """ 690 if self.account_exists(account): 691 return len(self._vault['account'][account]['log']) 692 return -1
Get the size of the log for a specific account.
Parameters: account (str): The account number for which the log size needs to be calculated.
Returns: int: The size of the log for the given account. If the account does not exist, -1 is returned.
694 @staticmethod 695 def file_hash(file_path: str, algorithm: str = "blake2b") -> str: 696 """ 697 Calculates the hash of a file using the specified algorithm. 698 699 Parameters: 700 file_path (str): The path to the file. 701 algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b". 702 703 Returns: 704 str: The hexadecimal representation of the file's hash. 705 """ 706 hash_obj = hashlib.new(algorithm) # Create the hash object 707 with open(file_path, "rb") as f: # Open file in binary mode for reading 708 for chunk in iter(lambda: f.read(4096), b""): # Read file in chunks 709 hash_obj.update(chunk) 710 return hash_obj.hexdigest() # Return the hash as a hexadecimal string
Calculates the hash of a file using the specified algorithm.
Parameters: file_path (str): The path to the file. algorithm (str, optional): The hashing algorithm to use. Defaults to "blake2b".
Returns: str: The hexadecimal representation of the file's hash.
712 def snapshot_cache_path(self): 713 """ 714 Generate the path for the cache file used to store snapshots. 715 716 The cache file is a camel file that stores the timestamps of the snapshots. 717 The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel". 718 719 Returns: 720 str: The path to the cache file. 721 """ 722 path = str(self.path()) 723 ext = self.ext() 724 ext_len = len(ext) 725 if path.endswith(f'.{ext}'): 726 path = path[:-ext_len-1] 727 _, filename = os.path.split(path + f'.snapshots.{ext}') 728 return self.base_path(filename)
Generate the path for the cache file used to store snapshots.
The cache file is a camel file that stores the timestamps of the snapshots. The file name is derived from the main database file name by replacing the ".camel" extension with ".snapshots.camel".
Returns: str: The path to the cache file.
730 def snapshot(self) -> bool: 731 """ 732 This function creates a snapshot of the current database state. 733 734 The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. 735 If a snapshot with the same hash exists, the function returns True without creating a new snapshot. 736 If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state 737 in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp. 738 739 Parameters: 740 None 741 742 Returns: 743 bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails. 744 """ 745 current_hash = self.file_hash(self.path()) 746 cache: dict[str, int] = {} # hash: time_ns 747 try: 748 with open(self.snapshot_cache_path(), 'r') as stream: 749 cache = camel.load(stream.read()) 750 except: 751 pass 752 if current_hash in cache: 753 return True 754 time = time_ns() 755 cache[current_hash] = time 756 if not self.save(self.base_path('snapshots', f'{time}.{self.ext()}')): 757 return False 758 with open(self.snapshot_cache_path(), 'w') as stream: 759 stream.write(camel.dump(cache)) 760 return True
This function creates a snapshot of the current database state.
The function calculates the hash of the current database file and checks if a snapshot with the same hash already exists. If a snapshot with the same hash exists, the function returns True without creating a new snapshot. If a snapshot with the same hash does not exist, the function creates a new snapshot by saving the current database state in a new camel file with a unique timestamp as the file name. The function also updates the snapshot cache file with the new snapshot's hash and timestamp.
Parameters: None
Returns: bool: True if a snapshot with the same hash already exists or if the snapshot is successfully created. False if the snapshot creation fails.
762 def snapshots(self, hide_missing: bool = True, verified_hash_only: bool = False) \ 763 -> dict[int, tuple[str, str, bool]]: 764 """ 765 Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status. 766 767 Parameters: 768 - hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True. 769 - verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False. 770 771 Returns: 772 - dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, 773 and the values are tuples containing the snapshot's hash, path, and existence status. 774 """ 775 cache: dict[str, int] = {} # hash: time_ns 776 try: 777 with open(self.snapshot_cache_path(), 'r') as stream: 778 cache = camel.load(stream.read()) 779 except: 780 pass 781 if not cache: 782 return {} 783 result: dict[int, tuple[str, str, bool]] = {} # time_ns: (hash, path, exists) 784 for file_hash, ref in cache.items(): 785 path = self.base_path('snapshots', f'{ref}.{self.ext()}') 786 exists = os.path.exists(path) 787 valid_hash = self.file_hash(path) == file_hash if verified_hash_only else True 788 if (verified_hash_only and not valid_hash) or (verified_hash_only and not exists): 789 continue 790 if exists or not hide_missing: 791 result[ref] = (file_hash, path, exists) 792 return result
Retrieve a dictionary of snapshots, with their respective hashes, paths, and existence status.
Parameters:
- hide_missing (bool): If True, only include snapshots that exist in the dictionary. Default is True.
- verified_hash_only (bool): If True, only include snapshots with a valid hash. Default is False.
Returns:
- dict[int, tuple[str, str, bool]]: A dictionary where the keys are the timestamps of the snapshots, and the values are tuples containing the snapshot's hash, path, and existence status.
794 def recall(self, dry=True, debug=False) -> bool: 795 """ 796 Revert the last operation. 797 798 Parameters: 799 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 800 debug (bool): If True, the function will print debug information. Default is False. 801 802 Returns: 803 bool: True if the operation was successful, False otherwise. 804 """ 805 if not self.nolock() or len(self._vault['history']) == 0: 806 return False 807 if len(self._vault['history']) <= 0: 808 return False 809 ref = sorted(self._vault['history'].keys())[-1] 810 if debug: 811 print('recall', ref) 812 memory = self._vault['history'][ref] 813 if debug: 814 print(type(memory), 'memory', memory) 815 limit = len(memory) + 1 816 sub_positive_log_negative = 0 817 for i in range(-1, -limit, -1): 818 x = memory[i] 819 if debug: 820 print(type(x), x) 821 match x['action']: 822 case Action.CREATE: 823 if x['account'] is not None: 824 if self.account_exists(x['account']): 825 if debug: 826 print('account', self._vault['account'][x['account']]) 827 assert len(self._vault['account'][x['account']]['box']) == 0 828 assert self._vault['account'][x['account']]['balance'] == 0 829 assert self._vault['account'][x['account']]['count'] == 0 830 if dry: 831 continue 832 del self._vault['account'][x['account']] 833 834 case Action.TRACK: 835 if x['account'] is not None: 836 if self.account_exists(x['account']): 837 if dry: 838 continue 839 self._vault['account'][x['account']]['balance'] -= x['value'] 840 self._vault['account'][x['account']]['count'] -= 1 841 del self._vault['account'][x['account']]['box'][x['ref']] 842 843 case Action.LOG: 844 if x['account'] is not None: 845 if self.account_exists(x['account']): 846 if x['ref'] in self._vault['account'][x['account']]['log']: 847 if dry: 848 continue 849 if sub_positive_log_negative == -x['value']: 850 self._vault['account'][x['account']]['count'] -= 1 851 sub_positive_log_negative = 0 852 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 853 if not box_ref is None: 854 assert self.box_exists(x['account'], box_ref) 855 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 856 assert box_value < 0 857 858 try: 859 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 860 except TypeError: 861 self._vault['account'][x['account']]['box'][box_ref]['rest'] += Decimal( 862 -box_value) 863 864 try: 865 self._vault['account'][x['account']]['balance'] += -box_value 866 except TypeError: 867 self._vault['account'][x['account']]['balance'] += Decimal(-box_value) 868 869 self._vault['account'][x['account']]['count'] -= 1 870 del self._vault['account'][x['account']]['log'][x['ref']] 871 872 case Action.SUB: 873 if x['account'] is not None: 874 if self.account_exists(x['account']): 875 if x['ref'] in self._vault['account'][x['account']]['box']: 876 if dry: 877 continue 878 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 879 self._vault['account'][x['account']]['balance'] += x['value'] 880 sub_positive_log_negative = x['value'] 881 882 case Action.ADD_FILE: 883 if x['account'] is not None: 884 if self.account_exists(x['account']): 885 if x['ref'] in self._vault['account'][x['account']]['log']: 886 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 887 if dry: 888 continue 889 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 890 891 case Action.REMOVE_FILE: 892 if x['account'] is not None: 893 if self.account_exists(x['account']): 894 if x['ref'] in self._vault['account'][x['account']]['log']: 895 if dry: 896 continue 897 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 898 899 case Action.BOX_TRANSFER: 900 if x['account'] is not None: 901 if self.account_exists(x['account']): 902 if x['ref'] in self._vault['account'][x['account']]['box']: 903 if dry: 904 continue 905 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 906 907 case Action.EXCHANGE: 908 if x['account'] is not None: 909 if x['account'] in self._vault['exchange']: 910 if x['ref'] in self._vault['exchange'][x['account']]: 911 if dry: 912 continue 913 del self._vault['exchange'][x['account']][x['ref']] 914 915 case Action.REPORT: 916 if x['ref'] in self._vault['report']: 917 if dry: 918 continue 919 del self._vault['report'][x['ref']] 920 921 case Action.ZAKAT: 922 if x['account'] is not None: 923 if self.account_exists(x['account']): 924 if x['ref'] in self._vault['account'][x['account']]['box']: 925 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 926 if dry: 927 continue 928 match x['math']: 929 case MathOperation.ADDITION: 930 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 931 'value'] 932 case MathOperation.EQUAL: 933 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 934 case MathOperation.SUBTRACTION: 935 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 936 'value'] 937 938 if not dry: 939 del self._vault['history'][ref] 940 return True
Revert the last operation.
Parameters: dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. debug (bool): If True, the function will print debug information. Default is False.
Returns: bool: True if the operation was successful, False otherwise.
942 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 943 """ 944 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 945 946 Parameters: 947 account (str): The account number for which to check the existence of the reference. 948 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 949 ref (int): The reference (transaction) number to check for existence. 950 951 Returns: 952 bool: True if the reference exists for the given account and reference type, False otherwise. 953 """ 954 if account in self._vault['account']: 955 return ref in self._vault['account'][account][ref_type] 956 return False
Check if a specific reference (transaction) exists in the vault for a given account and reference type.
Parameters: account (str): The account number for which to check the existence of the reference. ref_type (str): The type of reference (e.g., 'box', 'log', etc.). ref (int): The reference (transaction) number to check for existence.
Returns: bool: True if the reference exists for the given account and reference type, False otherwise.
958 def box_exists(self, account: str, ref: int) -> bool: 959 """ 960 Check if a specific box (transaction) exists in the vault for a given account and reference. 961 962 Parameters: 963 - account (str): The account number for which to check the existence of the box. 964 - ref (int): The reference (transaction) number to check for existence. 965 966 Returns: 967 - bool: True if the box exists for the given account and reference, False otherwise. 968 """ 969 return self.ref_exists(account, 'box', ref)
Check if a specific box (transaction) exists in the vault for a given account and reference.
Parameters:
- account (str): The account number for which to check the existence of the box.
- ref (int): The reference (transaction) number to check for existence.
Returns:
- bool: True if the box exists for the given account and reference, False otherwise.
971 def track(self, unscaled_value: float | int | Decimal = 0, desc: str = '', account: str = 1, logging: bool = True, 972 created: int = None, 973 debug: bool = False) -> int: 974 """ 975 This function tracks a transaction for a specific account. 976 977 Parameters: 978 unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. 979 desc (str): The description of the transaction. Default is an empty string. 980 account (str): The account for which the transaction is being tracked. Default is '1'. 981 logging (bool): Whether to log the transaction. Default is True. 982 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 983 debug (bool): Whether to print debug information. Default is False. 984 985 Returns: 986 int: The timestamp of the transaction. 987 988 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 989 990 Raises: 991 ValueError: The log transaction happened again in the same nanosecond time. 992 ValueError: The box transaction happened again in the same nanosecond time. 993 """ 994 if debug: 995 print('track', f'unscaled_value={unscaled_value}, debug={debug}') 996 if created is None: 997 created = self.time() 998 no_lock = self.nolock() 999 self.lock() 1000 if not self.account_exists(account): 1001 if debug: 1002 print(f"account {account} created") 1003 self._vault['account'][account] = { 1004 'balance': 0, 1005 'box': {}, 1006 'count': 0, 1007 'log': {}, 1008 'hide': False, 1009 'zakatable': True, 1010 } 1011 self._step(Action.CREATE, account) 1012 if unscaled_value == 0: 1013 if no_lock: 1014 self.free(self.lock()) 1015 return 0 1016 value = self.scale(unscaled_value) 1017 if logging: 1018 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 1019 if debug: 1020 print('create-box', created) 1021 if self.box_exists(account, created): 1022 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 1023 if debug: 1024 print('created-box', created) 1025 self._vault['account'][account]['box'][created] = { 1026 'capital': value, 1027 'count': 0, 1028 'last': 0, 1029 'rest': value, 1030 'total': 0, 1031 } 1032 self._step(Action.TRACK, account, ref=created, value=value) 1033 if no_lock: 1034 self.free(self.lock()) 1035 return created
This function tracks a transaction for a specific account.
Parameters: unscaled_value (float | int | Decimal): The value of the transaction. Default is 0. desc (str): The description of the transaction. Default is an empty string. account (str): The account for which the transaction is being tracked. Default is '1'. logging (bool): Whether to log the transaction. Default is True. created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. debug (bool): Whether to print debug information. Default is False.
Returns: int: The timestamp of the transaction.
This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box.
Raises: ValueError: The log transaction happened again in the same nanosecond time. ValueError: The box transaction happened again in the same nanosecond time.
1037 def log_exists(self, account: str, ref: int) -> bool: 1038 """ 1039 Checks if a specific transaction log entry exists for a given account. 1040 1041 Parameters: 1042 account (str): The account number associated with the transaction log. 1043 ref (int): The reference to the transaction log entry. 1044 1045 Returns: 1046 bool: True if the transaction log entry exists, False otherwise. 1047 """ 1048 return self.ref_exists(account, 'log', ref)
Checks if a specific transaction log entry exists for a given account.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry.
Returns: bool: True if the transaction log entry exists, False otherwise.
1094 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 1095 debug: bool = False) -> dict: 1096 """ 1097 This method is used to record or retrieve exchange rates for a specific account. 1098 1099 Parameters: 1100 - account (str): The account number for which the exchange rate is being recorded or retrieved. 1101 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 1102 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 1103 - description (str): A description of the exchange rate. 1104 1105 Returns: 1106 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 1107 it returns a dictionary with default values for the rate and description. 1108 """ 1109 if debug: 1110 print('exchange', f'debug={debug}') 1111 if created is None: 1112 created = self.time() 1113 no_lock = self.nolock() 1114 self.lock() 1115 if rate is not None: 1116 if rate <= 0: 1117 return dict() 1118 if account not in self._vault['exchange']: 1119 self._vault['exchange'][account] = {} 1120 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 1121 return {"time": created, "rate": 1, "description": None} 1122 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 1123 self._step(Action.EXCHANGE, account, ref=created, value=rate) 1124 if no_lock: 1125 self.free(self.lock()) 1126 if debug: 1127 print("exchange-created-1", 1128 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1129 1130 if account in self._vault['exchange']: 1131 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 1132 if valid_rates: 1133 latest_rate = max(valid_rates, key=lambda x: x[0]) 1134 if debug: 1135 print("exchange-read-1", 1136 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 1137 'latest_rate', latest_rate) 1138 result = latest_rate[1] 1139 result['time'] = latest_rate[0] 1140 return result # إرجاع قاموس يحتوي على المعدل والوصف 1141 if debug: 1142 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 1143 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ
This method is used to record or retrieve exchange rates for a specific account.
Parameters:
- account (str): The account number for which the exchange rate is being recorded or retrieved.
- created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used.
- rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate.
- description (str): A description of the exchange rate.
Returns:
- dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, it returns a dictionary with default values for the rate and description.
1145 @staticmethod 1146 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 1147 """ 1148 This function calculates the exchanged amount of a currency. 1149 1150 Args: 1151 x (float): The original amount of the currency. 1152 x_rate (float): The exchange rate of the original currency. 1153 y_rate (float): The exchange rate of the target currency. 1154 1155 Returns: 1156 float: The exchanged amount of the target currency. 1157 """ 1158 return (x * x_rate) / y_rate
This function calculates the exchanged amount of a currency.
Args: x (float): The original amount of the currency. x_rate (float): The exchange rate of the original currency. y_rate (float): The exchange rate of the target currency.
Returns: float: The exchanged amount of the target currency.
1160 def exchanges(self) -> dict: 1161 """ 1162 Retrieve the recorded exchange rates for all accounts. 1163 1164 Parameters: 1165 None 1166 1167 Returns: 1168 dict: A dictionary containing all recorded exchange rates. 1169 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 1170 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 1171 """ 1172 return self._vault['exchange'].copy()
Retrieve the recorded exchange rates for all accounts.
Parameters: None
Returns: dict: A dictionary containing all recorded exchange rates. The keys are account names or numbers, and the values are dictionaries containing the exchange rates. Each exchange rate dictionary has timestamps as keys and exchange rate details as values.
1174 def accounts(self) -> dict: 1175 """ 1176 Returns a dictionary containing account numbers as keys and their respective balances as values. 1177 1178 Parameters: 1179 None 1180 1181 Returns: 1182 dict: A dictionary where keys are account numbers and values are their respective balances. 1183 """ 1184 result = {} 1185 for i in self._vault['account']: 1186 result[i] = self._vault['account'][i]['balance'] 1187 return result
Returns a dictionary containing account numbers as keys and their respective balances as values.
Parameters: None
Returns: dict: A dictionary where keys are account numbers and values are their respective balances.
1189 def boxes(self, account) -> dict: 1190 """ 1191 Retrieve the boxes (transactions) associated with a specific account. 1192 1193 Parameters: 1194 account (str): The account number for which to retrieve the boxes. 1195 1196 Returns: 1197 dict: A dictionary containing the boxes associated with the given account. 1198 If the account does not exist, an empty dictionary is returned. 1199 """ 1200 if self.account_exists(account): 1201 return self._vault['account'][account]['box'] 1202 return {}
Retrieve the boxes (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the boxes.
Returns: dict: A dictionary containing the boxes associated with the given account. If the account does not exist, an empty dictionary is returned.
1204 def logs(self, account) -> dict: 1205 """ 1206 Retrieve the logs (transactions) associated with a specific account. 1207 1208 Parameters: 1209 account (str): The account number for which to retrieve the logs. 1210 1211 Returns: 1212 dict: A dictionary containing the logs associated with the given account. 1213 If the account does not exist, an empty dictionary is returned. 1214 """ 1215 if self.account_exists(account): 1216 return self._vault['account'][account]['log'] 1217 return {}
Retrieve the logs (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the logs.
Returns: dict: A dictionary containing the logs associated with the given account. If the account does not exist, an empty dictionary is returned.
1219 def add_file(self, account: str, ref: int, path: str) -> int: 1220 """ 1221 Adds a file reference to a specific transaction log entry in the vault. 1222 1223 Parameters: 1224 account (str): The account number associated with the transaction log. 1225 ref (int): The reference to the transaction log entry. 1226 path (str): The path of the file to be added. 1227 1228 Returns: 1229 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1230 """ 1231 if self.account_exists(account): 1232 if ref in self._vault['account'][account]['log']: 1233 file_ref = self.time() 1234 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1235 no_lock = self.nolock() 1236 self.lock() 1237 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1238 if no_lock: 1239 self.free(self.lock()) 1240 return file_ref 1241 return 0
Adds a file reference to a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. path (str): The path of the file to be added.
Returns: int: The reference of the added file. If the account or transaction log entry does not exist, returns 0.
1243 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1244 """ 1245 Removes a file reference from a specific transaction log entry in the vault. 1246 1247 Parameters: 1248 account (str): The account number associated with the transaction log. 1249 ref (int): The reference to the transaction log entry. 1250 file_ref (int): The reference of the file to be removed. 1251 1252 Returns: 1253 bool: True if the file reference is successfully removed, False otherwise. 1254 """ 1255 if self.account_exists(account): 1256 if ref in self._vault['account'][account]['log']: 1257 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1258 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1259 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1260 no_lock = self.nolock() 1261 self.lock() 1262 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1263 if no_lock: 1264 self.free(self.lock()) 1265 return True 1266 return False
Removes a file reference from a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. file_ref (int): The reference of the file to be removed.
Returns: bool: True if the file reference is successfully removed, False otherwise.
1268 def balance(self, account: str = 1, cached: bool = True) -> int: 1269 """ 1270 Calculate and return the balance of a specific account. 1271 1272 Parameters: 1273 account (str): The account number. Default is '1'. 1274 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1275 1276 Returns: 1277 int: The balance of the account. 1278 1279 Note: 1280 If cached is True, the function returns the cached balance. 1281 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1282 """ 1283 if cached: 1284 return self._vault['account'][account]['balance'] 1285 x = 0 1286 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1]
Calculate and return the balance of a specific account.
Parameters: account (str): The account number. Default is '1'. cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True.
Returns: int: The balance of the account.
Note: If cached is True, the function returns the cached balance. If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items.
1288 def hide(self, account, status: bool = None) -> bool: 1289 """ 1290 Check or set the hide status of a specific account. 1291 1292 Parameters: 1293 account (str): The account number. 1294 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1295 1296 Returns: 1297 bool: The current or updated hide status of the account. 1298 1299 Raises: 1300 None 1301 1302 Example: 1303 >>> tracker = ZakatTracker() 1304 >>> ref = tracker.track(51, 'desc', 'account1') 1305 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1306 False 1307 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1308 True 1309 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1310 True 1311 >>> tracker.hide('account1', False) 1312 False 1313 """ 1314 if self.account_exists(account): 1315 if status is None: 1316 return self._vault['account'][account]['hide'] 1317 self._vault['account'][account]['hide'] = status 1318 return status 1319 return False
Check or set the hide status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new hide status. If not provided, the function will return the current status.
Returns: bool: The current or updated hide status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.hide('account1') # Set the hide status of 'account1' to True
False
>>> tracker.hide('account1', True) # Set the hide status of 'account1' to True
True
>>> tracker.hide('account1') # Get the hide status of 'account1' by default
True
>>> tracker.hide('account1', False)
False
1321 def zakatable(self, account, status: bool = None) -> bool: 1322 """ 1323 Check or set the zakatable status of a specific account. 1324 1325 Parameters: 1326 account (str): The account number. 1327 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1328 1329 Returns: 1330 bool: The current or updated zakatable status of the account. 1331 1332 Raises: 1333 None 1334 1335 Example: 1336 >>> tracker = ZakatTracker() 1337 >>> ref = tracker.track(51, 'desc', 'account1') 1338 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1339 True 1340 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1341 True 1342 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1343 True 1344 >>> tracker.zakatable('account1', False) 1345 False 1346 """ 1347 if self.account_exists(account): 1348 if status is None: 1349 return self._vault['account'][account]['zakatable'] 1350 self._vault['account'][account]['zakatable'] = status 1351 return status 1352 return False
Check or set the zakatable status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new zakatable status. If not provided, the function will return the current status.
Returns: bool: The current or updated zakatable status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default
True
>>> tracker.zakatable('account1', False)
False
1354 def sub(self, unscaled_value: float | int | Decimal, desc: str = '', account: str = 1, created: int = None, 1355 debug: bool = False) \ 1356 -> tuple[ 1357 int, 1358 list[ 1359 tuple[int, int], 1360 ], 1361 ] | tuple: 1362 """ 1363 Subtracts a specified value from an account's balance. 1364 1365 Parameters: 1366 unscaled_value (float | int | Decimal): The amount to be subtracted. 1367 desc (str): A description for the transaction. Defaults to an empty string. 1368 account (str): The account from which the value will be subtracted. Defaults to '1'. 1369 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1370 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1371 1372 Returns: 1373 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1374 1375 If the amount to subtract is greater than the account's balance, 1376 the remaining amount will be transferred to a new transaction with a negative value. 1377 1378 Raises: 1379 ValueError: The box transaction happened again in the same nanosecond time. 1380 ValueError: The log transaction happened again in the same nanosecond time. 1381 """ 1382 if debug: 1383 print('sub', f'debug={debug}') 1384 if unscaled_value < 0: 1385 return tuple() 1386 if unscaled_value == 0: 1387 ref = self.track(unscaled_value, '', account) 1388 return ref, ref 1389 if created is None: 1390 created = self.time() 1391 no_lock = self.nolock() 1392 self.lock() 1393 self.track(0, '', account) 1394 value = self.scale(unscaled_value) 1395 self._log(value=-value, desc=desc, account=account, created=created, ref=None, debug=debug) 1396 ids = sorted(self._vault['account'][account]['box'].keys()) 1397 limit = len(ids) + 1 1398 target = value 1399 if debug: 1400 print('ids', ids) 1401 ages = [] 1402 for i in range(-1, -limit, -1): 1403 if target == 0: 1404 break 1405 j = ids[i] 1406 if debug: 1407 print('i', i, 'j', j) 1408 rest = self._vault['account'][account]['box'][j]['rest'] 1409 if rest >= target: 1410 self._vault['account'][account]['box'][j]['rest'] -= target 1411 self._step(Action.SUB, account, ref=j, value=target) 1412 ages.append((j, target)) 1413 target = 0 1414 break 1415 elif target > rest > 0: 1416 chunk = rest 1417 target -= chunk 1418 self._step(Action.SUB, account, ref=j, value=chunk) 1419 ages.append((j, chunk)) 1420 self._vault['account'][account]['box'][j]['rest'] = 0 1421 if target > 0: 1422 self.track( 1423 unscaled_value=self.unscale(-target), 1424 desc=desc, 1425 account=account, 1426 logging=False, 1427 created=created, 1428 ) 1429 ages.append((created, target)) 1430 if no_lock: 1431 self.free(self.lock()) 1432 return created, ages
Subtracts a specified value from an account's balance.
Parameters: unscaled_value (float | int | Decimal): The amount to be subtracted. desc (str): A description for the transaction. Defaults to an empty string. account (str): The account from which the value will be subtracted. Defaults to '1'. created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction.
If the amount to subtract is greater than the account's balance, the remaining amount will be transferred to a new transaction with a negative value.
Raises: ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1434 def transfer(self, unscaled_amount: float | int | Decimal, from_account: str, to_account: str, desc: str = '', 1435 created: int = None, 1436 debug: bool = False) -> list[int]: 1437 """ 1438 Transfers a specified value from one account to another. 1439 1440 Parameters: 1441 unscaled_amount (float | int | Decimal): The amount to be transferred. 1442 from_account (str): The account from which the value will be transferred. 1443 to_account (str): The account to which the value will be transferred. 1444 desc (str, optional): A description for the transaction. Defaults to an empty string. 1445 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1446 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1447 1448 Returns: 1449 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1450 1451 Raises: 1452 ValueError: Transfer to the same account is forbidden. 1453 ValueError: The box transaction happened again in the same nanosecond time. 1454 ValueError: The log transaction happened again in the same nanosecond time. 1455 """ 1456 if debug: 1457 print('transfer', f'debug={debug}') 1458 if from_account == to_account: 1459 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1460 if unscaled_amount <= 0: 1461 return [] 1462 if created is None: 1463 created = self.time() 1464 (_, ages) = self.sub(unscaled_amount, desc, from_account, created, debug=debug) 1465 times = [] 1466 source_exchange = self.exchange(from_account, created) 1467 target_exchange = self.exchange(to_account, created) 1468 1469 if debug: 1470 print('ages', ages) 1471 1472 for age, value in ages: 1473 target_amount = int(self.exchange_calc(value, source_exchange['rate'], target_exchange['rate'])) 1474 if debug: 1475 print('target_amount', target_amount) 1476 # Perform the transfer 1477 if self.box_exists(to_account, age): 1478 if debug: 1479 print('box_exists', age) 1480 capital = self._vault['account'][to_account]['box'][age]['capital'] 1481 rest = self._vault['account'][to_account]['box'][age]['rest'] 1482 if debug: 1483 print( 1484 f"Transfer(loop) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1485 selected_age = age 1486 if rest + target_amount > capital: 1487 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1488 selected_age = ZakatTracker.time() 1489 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1490 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1491 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1492 created=None, ref=None, debug=debug) 1493 times.append((age, y)) 1494 continue 1495 if debug: 1496 print( 1497 f"Transfer(func) {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1498 y = self.track( 1499 unscaled_value=self.unscale(int(target_amount)), 1500 desc=desc, 1501 account=to_account, 1502 logging=True, 1503 created=age, 1504 debug=debug, 1505 ) 1506 times.append(y) 1507 return times
Transfers a specified value from one account to another.
Parameters: unscaled_amount (float | int | Decimal): The amount to be transferred. from_account (str): The account from which the value will be transferred. to_account (str): The account to which the value will be transferred. desc (str, optional): A description for the transaction. Defaults to an empty string. created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: list[int]: A list of timestamps corresponding to the transactions made during the transfer.
Raises: ValueError: Transfer to the same account is forbidden. ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1509 def check(self, silver_gram_price: float, unscaled_nisab: float | int | Decimal = None, debug: bool = False, now: int = None, 1510 cycle: float = None) -> tuple: 1511 """ 1512 Check the eligibility for Zakat based on the given parameters. 1513 1514 Parameters: 1515 silver_gram_price (float): The price of a gram of silver. 1516 unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, 1517 it will be calculated based on the silver_gram_price. 1518 debug (bool): Flag to enable debug mode. 1519 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1520 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1521 1522 Returns: 1523 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1524 and a dictionary containing the Zakat plan. 1525 """ 1526 if debug: 1527 print('check', f'debug={debug}') 1528 if now is None: 1529 now = self.time() 1530 if cycle is None: 1531 cycle = ZakatTracker.TimeCycle() 1532 if unscaled_nisab is None: 1533 unscaled_nisab = ZakatTracker.Nisab(silver_gram_price) 1534 nisab = self.scale(unscaled_nisab) 1535 plan = {} 1536 below_nisab = 0 1537 brief = [0, 0, 0] 1538 valid = False 1539 if debug: 1540 print('exchanges', self.exchanges()) 1541 for x in self._vault['account']: 1542 if not self.zakatable(x): 1543 continue 1544 _box = self._vault['account'][x]['box'] 1545 _log = self._vault['account'][x]['log'] 1546 limit = len(_box) + 1 1547 ids = sorted(self._vault['account'][x]['box'].keys()) 1548 for i in range(-1, -limit, -1): 1549 j = ids[i] 1550 rest = float(_box[j]['rest']) 1551 if rest <= 0: 1552 continue 1553 exchange = self.exchange(x, created=self.time()) 1554 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1555 brief[0] += rest 1556 index = limit + i - 1 1557 epoch = (now - j) / cycle 1558 if debug: 1559 print(f"Epoch: {epoch}", _box[j]) 1560 if _box[j]['last'] > 0: 1561 epoch = (now - _box[j]['last']) / cycle 1562 if debug: 1563 print(f"Epoch: {epoch}") 1564 epoch = floor(epoch) 1565 if debug: 1566 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1567 if epoch == 0: 1568 continue 1569 if debug: 1570 print("Epoch - PASSED") 1571 brief[1] += rest 1572 if rest >= nisab: 1573 total = 0 1574 for _ in range(epoch): 1575 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1576 if total > 0: 1577 if x not in plan: 1578 plan[x] = {} 1579 valid = True 1580 brief[2] += total 1581 plan[x][index] = { 1582 'total': total, 1583 'count': epoch, 1584 'box_time': j, 1585 'box_capital': _box[j]['capital'], 1586 'box_rest': _box[j]['rest'], 1587 'box_last': _box[j]['last'], 1588 'box_total': _box[j]['total'], 1589 'box_count': _box[j]['count'], 1590 'box_log': _log[j]['desc'], 1591 'exchange_rate': exchange['rate'], 1592 'exchange_time': exchange['time'], 1593 'exchange_desc': exchange['description'], 1594 } 1595 else: 1596 chunk = ZakatTracker.ZakatCut(float(rest)) 1597 if chunk > 0: 1598 if x not in plan: 1599 plan[x] = {} 1600 if j not in plan[x].keys(): 1601 plan[x][index] = {} 1602 below_nisab += rest 1603 brief[2] += chunk 1604 plan[x][index]['below_nisab'] = chunk 1605 plan[x][index]['total'] = chunk 1606 plan[x][index]['count'] = epoch 1607 plan[x][index]['box_time'] = j 1608 plan[x][index]['box_capital'] = _box[j]['capital'] 1609 plan[x][index]['box_rest'] = _box[j]['rest'] 1610 plan[x][index]['box_last'] = _box[j]['last'] 1611 plan[x][index]['box_total'] = _box[j]['total'] 1612 plan[x][index]['box_count'] = _box[j]['count'] 1613 plan[x][index]['box_log'] = _log[j]['desc'] 1614 plan[x][index]['exchange_rate'] = exchange['rate'] 1615 plan[x][index]['exchange_time'] = exchange['time'] 1616 plan[x][index]['exchange_desc'] = exchange['description'] 1617 valid = valid or below_nisab >= nisab 1618 if debug: 1619 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1620 return valid, brief, plan
Check the eligibility for Zakat based on the given parameters.
Parameters: silver_gram_price (float): The price of a gram of silver. unscaled_nisab (float | int | Decimal): The minimum amount of wealth required for Zakat. If not provided, it will be calculated based on the silver_gram_price. debug (bool): Flag to enable debug mode. now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle().
Returns: tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, and a dictionary containing the Zakat plan.
1622 def build_payment_parts(self, scaled_demand: int, positive_only: bool = True) -> dict: 1623 """ 1624 Build payment parts for the Zakat distribution. 1625 1626 Parameters: 1627 scaled_demand (int): The total demand for payment in local currency. 1628 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1629 1630 Returns: 1631 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1632 { 1633 'account': { 1634 'account_id': {'balance': float, 'rate': float, 'part': float}, 1635 ... 1636 }, 1637 'exceed': bool, 1638 'demand': int, 1639 'total': float, 1640 } 1641 """ 1642 total = 0 1643 parts = { 1644 'account': {}, 1645 'exceed': False, 1646 'demand': int(round(scaled_demand)), 1647 } 1648 for x, y in self.accounts().items(): 1649 if positive_only and y <= 0: 1650 continue 1651 total += float(y) 1652 exchange = self.exchange(x) 1653 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1654 parts['total'] = total 1655 return parts
Build payment parts for the Zakat distribution.
Parameters: scaled_demand (int): The total demand for payment in local currency. positive_only (bool): If True, only consider accounts with positive balance. Default is True.
Returns: dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: { 'account': { 'account_id': {'balance': float, 'rate': float, 'part': float}, ... }, 'exceed': bool, 'demand': int, 'total': float, }
1657 @staticmethod 1658 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1659 """ 1660 Checks the validity of payment parts. 1661 1662 Parameters: 1663 parts (dict): A dictionary containing payment parts information. 1664 debug (bool): Flag to enable debug mode. 1665 1666 Returns: 1667 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1668 1669 Error Codes: 1670 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1671 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1672 3: 'part' value in parts['account'][x] is less than 0. 1673 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1674 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1675 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1676 """ 1677 if debug: 1678 print('check_payment_parts', f'debug={debug}') 1679 for i in ['demand', 'account', 'total', 'exceed']: 1680 if i not in parts: 1681 return 1 1682 exceed = parts['exceed'] 1683 for x in parts['account']: 1684 for j in ['balance', 'rate', 'part']: 1685 if j not in parts['account'][x]: 1686 return 2 1687 if parts['account'][x]['part'] < 0: 1688 return 3 1689 if not exceed and parts['account'][x]['balance'] <= 0: 1690 return 4 1691 demand = parts['demand'] 1692 z = 0 1693 for _, y in parts['account'].items(): 1694 if not exceed and y['part'] > y['balance']: 1695 return 5 1696 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1697 z = round(z, 2) 1698 demand = round(demand, 2) 1699 if debug: 1700 print('check_payment_parts', f'z = {z}, demand = {demand}') 1701 print('check_payment_parts', type(z), type(demand)) 1702 print('check_payment_parts', z != demand) 1703 print('check_payment_parts', str(z) != str(demand)) 1704 if z != demand and str(z) != str(demand): 1705 return 6 1706 return 0
Checks the validity of payment parts.
Parameters: parts (dict): A dictionary containing payment parts information. debug (bool): Flag to enable debug mode.
Returns: int: Returns 0 if the payment parts are valid, otherwise returns the error code.
Error Codes: 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 3: 'part' value in parts['account'][x] is less than 0. 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 6: The sum of 'part' values in parts['account'] does not match with 'demand' value.
1708 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1709 """ 1710 Perform Zakat calculation based on the given report and optional parts. 1711 1712 Parameters: 1713 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1714 parts (dict): A dictionary containing the payment parts for the zakat. 1715 debug (bool): A flag indicating whether to print debug information. 1716 1717 Returns: 1718 bool: True if the zakat calculation is successful, False otherwise. 1719 """ 1720 if debug: 1721 print('zakat', f'debug={debug}') 1722 valid, _, plan = report 1723 if not valid: 1724 return valid 1725 parts_exist = parts is not None 1726 if parts_exist: 1727 if self.check_payment_parts(parts, debug=debug) != 0: 1728 return False 1729 if debug: 1730 print('######### zakat #######') 1731 print('parts_exist', parts_exist) 1732 no_lock = self.nolock() 1733 self.lock() 1734 report_time = self.time() 1735 self._vault['report'][report_time] = report 1736 self._step(Action.REPORT, ref=report_time) 1737 created = self.time() 1738 for x in plan: 1739 target_exchange = self.exchange(x) 1740 if debug: 1741 print(plan[x]) 1742 print('-------------') 1743 print(self._vault['account'][x]['box']) 1744 ids = sorted(self._vault['account'][x]['box'].keys()) 1745 if debug: 1746 print('plan[x]', plan[x]) 1747 for i in plan[x].keys(): 1748 j = ids[i] 1749 if debug: 1750 print('i', i, 'j', j) 1751 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1752 key='last', 1753 math_operation=MathOperation.EQUAL) 1754 self._vault['account'][x]['box'][j]['last'] = created 1755 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1756 self._vault['account'][x]['box'][j]['total'] += amount 1757 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1758 math_operation=MathOperation.ADDITION) 1759 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1760 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1761 math_operation=MathOperation.ADDITION) 1762 if not parts_exist: 1763 try: 1764 self._vault['account'][x]['box'][j]['rest'] -= amount 1765 except TypeError: 1766 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1767 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1768 # math_operation=MathOperation.SUBTRACTION) 1769 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1770 if parts_exist: 1771 for account, part in parts['account'].items(): 1772 if part['part'] == 0: 1773 continue 1774 if debug: 1775 print('zakat-part', account, part['rate']) 1776 target_exchange = self.exchange(account) 1777 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1778 self.sub( 1779 unscaled_value=self.unscale(amount), 1780 desc='zakat-part-دفعة-زكاة', 1781 account=account, 1782 debug=debug, 1783 ) 1784 if no_lock: 1785 self.free(self.lock()) 1786 return True
Perform Zakat calculation based on the given report and optional parts.
Parameters: report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. parts (dict): A dictionary containing the payment parts for the zakat. debug (bool): A flag indicating whether to print debug information.
Returns: bool: True if the zakat calculation is successful, False otherwise.
1788 def export_json(self, path: str = "data.json") -> bool: 1789 """ 1790 Exports the current state of the ZakatTracker object to a JSON file. 1791 1792 Parameters: 1793 path (str): The path where the JSON file will be saved. Default is "data.json". 1794 1795 Returns: 1796 bool: True if the export is successful, False otherwise. 1797 1798 Raises: 1799 No specific exceptions are raised by this method. 1800 """ 1801 with open(path, "w") as file: 1802 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1803 return True
Exports the current state of the ZakatTracker object to a JSON file.
Parameters: path (str): The path where the JSON file will be saved. Default is "data.json".
Returns: bool: True if the export is successful, False otherwise.
Raises: No specific exceptions are raised by this method.
1805 def save(self, path: str = None) -> bool: 1806 """ 1807 Saves the ZakatTracker's current state to a camel file. 1808 1809 This method serializes the internal data (`_vault`). 1810 1811 Parameters: 1812 path (str, optional): File path for saving. Defaults to a predefined location. 1813 1814 Returns: 1815 bool: True if the save operation is successful, False otherwise. 1816 """ 1817 if path is None: 1818 path = self.path() 1819 with open(f'{path}.tmp', 'w') as stream: 1820 # first save in tmp file 1821 stream.write(camel.dump(self._vault)) 1822 # then move tmp file to original location 1823 shutil.move(f'{path}.tmp', path) 1824 return True
Saves the ZakatTracker's current state to a camel file.
This method serializes the internal data (_vault
).
Parameters: path (str, optional): File path for saving. Defaults to a predefined location.
Returns: bool: True if the save operation is successful, False otherwise.
1826 def load(self, path: str = None) -> bool: 1827 """ 1828 Load the current state of the ZakatTracker object from a camel file. 1829 1830 Parameters: 1831 path (str): The path where the camel file is located. If not provided, it will use the default path. 1832 1833 Returns: 1834 bool: True if the load operation is successful, False otherwise. 1835 """ 1836 if path is None: 1837 path = self.path() 1838 if os.path.exists(path): 1839 with open(path, 'r') as stream: 1840 self._vault = camel.load(stream.read()) 1841 return True 1842 return False
Load the current state of the ZakatTracker object from a camel file.
Parameters: path (str): The path where the camel file is located. If not provided, it will use the default path.
Returns: bool: True if the load operation is successful, False otherwise.
1844 def import_csv_cache_path(self): 1845 """ 1846 Generates the cache file path for imported CSV data. 1847 1848 This function constructs the file path where cached data from CSV imports 1849 will be stored. The cache file is a camel file (.camel extension) appended 1850 to the base path of the object. 1851 1852 Returns: 1853 str: The full path to the import CSV cache file. 1854 1855 Example: 1856 >>> obj = ZakatTracker('/data/reports') 1857 >>> obj.import_csv_cache_path() 1858 '/data/reports.import_csv.camel' 1859 """ 1860 path = str(self.path()) 1861 ext = self.ext() 1862 ext_len = len(ext) 1863 if path.endswith(f'.{ext}'): 1864 path = path[:-ext_len-1] 1865 _, filename = os.path.split(path + f'.import_csv.{ext}') 1866 return self.base_path(filename)
Generates the cache file path for imported CSV data.
This function constructs the file path where cached data from CSV imports will be stored. The cache file is a camel file (.camel extension) appended to the base path of the object.
Returns: str: The full path to the import CSV cache file.
Example:
obj = ZakatTracker('/data/reports') obj.import_csv_cache_path() '/data/reports.import_csv.camel'
1868 def import_csv(self, path: str = 'file.csv', scale_decimal_places: int = 0, debug: bool = False) -> tuple: 1869 """ 1870 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1871 1872 Parameters: 1873 path (str): The path to the CSV file. Default is 'file.csv'. 1874 scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. 1875 debug (bool): A flag indicating whether to print debug information. 1876 1877 Returns: 1878 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1879 and a dictionary of bad transactions. 1880 1881 Notes: 1882 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1883 are appropriate for the currency pairs involved in the conversions. 1884 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1885 to 1.0 or the previous rate for that account. 1886 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1887 transactions of the same account within the whole imported and existing dataset when doing `check` and 1888 `zakat` operations. 1889 1890 Example Usage: 1891 The CSV file should have the following format, rate is optional per transaction: 1892 account, desc, value, date, rate 1893 For example: 1894 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1895 """ 1896 if debug: 1897 print('import_csv', f'debug={debug}') 1898 cache: list[int] = [] 1899 try: 1900 with open(self.import_csv_cache_path(), 'r') as stream: 1901 cache = camel.load(stream.read()) 1902 except: 1903 pass 1904 date_formats = [ 1905 "%Y-%m-%d %H:%M:%S", 1906 "%Y-%m-%dT%H:%M:%S", 1907 "%Y-%m-%dT%H%M%S", 1908 "%Y-%m-%d", 1909 ] 1910 created, found, bad = 0, 0, {} 1911 data: dict[int, list] = {} 1912 with open(path, newline='', encoding="utf-8") as f: 1913 i = 0 1914 for row in csv.reader(f, delimiter=','): 1915 i += 1 1916 hashed = hash(tuple(row)) 1917 if hashed in cache: 1918 found += 1 1919 continue 1920 account = row[0] 1921 desc = row[1] 1922 value = float(row[2]) 1923 rate = 1.0 1924 if row[4:5]: # Empty list if index is out of range 1925 rate = float(row[4]) 1926 date: int = 0 1927 for time_format in date_formats: 1928 try: 1929 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1930 break 1931 except: 1932 pass 1933 # TODO: not allowed for negative dates in the future after enhance time functions 1934 if date == 0: 1935 bad[i] = row + ['invalid date'] 1936 if value == 0: 1937 bad[i] = row + ['invalid value'] 1938 continue 1939 if date not in data: 1940 data[date] = [] 1941 data[date].append((i, account, desc, value, date, rate, hashed)) 1942 1943 if debug: 1944 print('import_csv', len(data)) 1945 1946 if bad: 1947 return created, found, bad 1948 1949 for date, rows in sorted(data.items()): 1950 try: 1951 len_rows = len(rows) 1952 if len_rows == 1: 1953 (_, account, desc, unscaled_value, date, rate, hashed) = rows[0] 1954 value = self.unscale(unscaled_value, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value 1955 if rate > 0: 1956 self.exchange(account=account, created=date, rate=rate) 1957 if value > 0: 1958 self.track(unscaled_value=value, desc=desc, account=account, logging=True, created=date) 1959 elif value < 0: 1960 self.sub(unscaled_value=-value, desc=desc, account=account, created=date) 1961 created += 1 1962 cache.append(hashed) 1963 continue 1964 if debug: 1965 print('-- Duplicated time detected', date, 'len', len_rows) 1966 print(rows) 1967 print('---------------------------------') 1968 # If records are found at the same time with different accounts in the same amount 1969 # (one positive and the other negative), this indicates it is a transfer. 1970 if len_rows != 2: 1971 raise Exception(f'more than two transactions({len_rows}) at the same time') 1972 (i, account1, desc1, unscaled_value1, date1, rate1, _) = rows[0] 1973 (j, account2, desc2, unscaled_value2, date2, rate2, _) = rows[1] 1974 if account1 == account2 or desc1 != desc2 or abs(unscaled_value1) != abs(unscaled_value2) or date1 != date2: 1975 raise Exception('invalid transfer') 1976 if rate1 > 0: 1977 self.exchange(account1, created=date1, rate=rate1) 1978 if rate2 > 0: 1979 self.exchange(account2, created=date2, rate=rate2) 1980 value1 = self.unscale(unscaled_value1, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value1 1981 value2 = self.unscale(unscaled_value2, decimal_places=scale_decimal_places) if scale_decimal_places > 0 else unscaled_value2 1982 values = { 1983 value1: account1, 1984 value2: account2, 1985 } 1986 self.transfer( 1987 unscaled_amount=abs(value1), 1988 from_account=values[min(values.keys())], 1989 to_account=values[max(values.keys())], 1990 desc=desc1, 1991 created=date1, 1992 ) 1993 except Exception as e: 1994 for (i, account, desc, value, date, rate, _) in rows: 1995 bad[i] = (account, desc, value, date, rate, e) 1996 break 1997 with open(self.import_csv_cache_path(), 'w') as stream: 1998 stream.write(camel.dump(cache)) 1999 return created, found, bad
The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system.
Parameters: path (str): The path to the CSV file. Default is 'file.csv'. scale_decimal_places (int): The number of decimal places to scale the value. Default is 0. debug (bool): A flag indicating whether to print debug information.
Returns: tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, and a dictionary of bad transactions.
Notes:
* Currency Pair Assumption: This function assumes that the exchange rates stored for each account
are appropriate for the currency pairs involved in the conversions.
* The exchange rate for each account is based on the last encountered transaction rate that is not equal
to 1.0 or the previous rate for that account.
* Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent
transactions of the same account within the whole imported and existing dataset when doing check
and
zakat
operations.
Example Usage: The CSV file should have the following format, rate is optional per transaction: account, desc, value, date, rate For example: safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1
2005 @staticmethod 2006 def human_readable_size(size: float, decimal_places: int = 2) -> str: 2007 """ 2008 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 2009 2010 This function iterates through progressively larger units of information 2011 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 2012 range that can be expressed with a reasonable number before the unit. 2013 2014 Parameters: 2015 size (float): The size in bytes to convert. 2016 decimal_places (int, optional): The number of decimal places to display 2017 in the result. Defaults to 2. 2018 2019 Returns: 2020 str: A string representation of the size in a human-readable format, 2021 rounded to the specified number of decimal places. For example: 2022 - "1.50 KB" (1536 bytes) 2023 - "23.00 MB" (24117248 bytes) 2024 - "1.23 GB" (1325899906 bytes) 2025 """ 2026 if type(size) not in (float, int): 2027 raise TypeError("size must be a float or integer") 2028 if type(decimal_places) != int: 2029 raise TypeError("decimal_places must be an integer") 2030 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 2031 if size < 1024.0: 2032 break 2033 size /= 1024.0 2034 return f"{size:.{decimal_places}f} {unit}"
Converts a size in bytes to a human-readable format (e.g., KB, MB, GB).
This function iterates through progressively larger units of information (B, KB, MB, GB, etc.) and divides the input size until it fits within a range that can be expressed with a reasonable number before the unit.
Parameters: size (float): The size in bytes to convert. decimal_places (int, optional): The number of decimal places to display in the result. Defaults to 2.
Returns: str: A string representation of the size in a human-readable format, rounded to the specified number of decimal places. For example: - "1.50 KB" (1536 bytes) - "23.00 MB" (24117248 bytes) - "1.23 GB" (1325899906 bytes)
2036 @staticmethod 2037 def get_dict_size(obj: dict, seen: set = None) -> float: 2038 """ 2039 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 2040 2041 This function traverses the dictionary structure, accounting for the size of keys, values, 2042 and any nested objects. It handles various data types commonly found in dictionaries 2043 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 2044 of circular references. 2045 2046 Parameters: 2047 obj (dict): The dictionary whose size is to be calculated. 2048 seen (set, optional): A set used internally to track visited objects 2049 and avoid circular references. Defaults to None. 2050 2051 Returns: 2052 float: An approximate size of the dictionary and its contents in bytes. 2053 2054 Note: 2055 - This function is a method of the `ZakatTracker` class and is likely used to 2056 estimate the memory footprint of data structures relevant to Zakat calculations. 2057 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 2058 not account for all memory overhead depending on the Python implementation. 2059 - Circular references are handled to prevent infinite recursion. 2060 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 2061 - String sizes are estimated based on character length and encoding. 2062 """ 2063 size = 0 2064 if seen is None: 2065 seen = set() 2066 2067 obj_id = id(obj) 2068 if obj_id in seen: 2069 return 0 2070 2071 seen.add(obj_id) 2072 size += sys.getsizeof(obj) 2073 2074 if isinstance(obj, dict): 2075 for k, v in obj.items(): 2076 size += ZakatTracker.get_dict_size(k, seen) 2077 size += ZakatTracker.get_dict_size(v, seen) 2078 elif isinstance(obj, (list, tuple, set, frozenset)): 2079 for item in obj: 2080 size += ZakatTracker.get_dict_size(item, seen) 2081 elif isinstance(obj, (int, float, complex)): # Handle numbers 2082 pass # Basic numbers have a fixed size, so nothing to add here 2083 elif isinstance(obj, str): # Handle strings 2084 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 2085 return size
Recursively calculates the approximate memory size of a dictionary and its contents in bytes.
This function traverses the dictionary structure, accounting for the size of keys, values, and any nested objects. It handles various data types commonly found in dictionaries (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case of circular references.
Parameters: obj (dict): The dictionary whose size is to be calculated. seen (set, optional): A set used internally to track visited objects and avoid circular references. Defaults to None.
Returns: float: An approximate size of the dictionary and its contents in bytes.
Note:
- This function is a method of the
ZakatTracker
class and is likely used to estimate the memory footprint of data structures relevant to Zakat calculations. - The size calculation is approximate as it relies on
sys.getsizeof()
, which might not account for all memory overhead depending on the Python implementation. - Circular references are handled to prevent infinite recursion.
- Basic numeric types (int, float, complex) are assumed to have fixed sizes.
- String sizes are estimated based on character length and encoding.
2087 @staticmethod 2088 def duration_from_nanoseconds(ns: int, 2089 show_zeros_in_spoken_time: bool = False, 2090 spoken_time_separator=',', 2091 millennia: str = 'Millennia', 2092 century: str = 'Century', 2093 years: str = 'Years', 2094 days: str = 'Days', 2095 hours: str = 'Hours', 2096 minutes: str = 'Minutes', 2097 seconds: str = 'Seconds', 2098 milli_seconds: str = 'MilliSeconds', 2099 micro_seconds: str = 'MicroSeconds', 2100 nano_seconds: str = 'NanoSeconds', 2101 ) -> tuple: 2102 """ 2103 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 2104 Convert NanoSeconds to Human Readable Time Format. 2105 A NanoSeconds is a unit of time in the International System of Units (SI) equal 2106 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 2107 Its symbol is μs, sometimes simplified to us when Unicode is not available. 2108 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 2109 2110 INPUT : ms (AKA: MilliSeconds) 2111 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 2112 OUTPUT Variables: time_lapsed, spoken_time 2113 2114 Example Input: duration_from_nanoseconds(ns) 2115 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 2116 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 2117 duration_from_nanoseconds(1234567890123456789012) 2118 """ 2119 us, ns = divmod(ns, 1000) 2120 ms, us = divmod(us, 1000) 2121 s, ms = divmod(ms, 1000) 2122 m, s = divmod(s, 60) 2123 h, m = divmod(m, 60) 2124 d, h = divmod(h, 24) 2125 y, d = divmod(d, 365) 2126 c, y = divmod(y, 100) 2127 n, c = divmod(c, 10) 2128 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 2129 spoken_time_part = [] 2130 if n > 0 or show_zeros_in_spoken_time: 2131 spoken_time_part.append(f"{n: 3d} {millennia}") 2132 if c > 0 or show_zeros_in_spoken_time: 2133 spoken_time_part.append(f"{c: 4d} {century}") 2134 if y > 0 or show_zeros_in_spoken_time: 2135 spoken_time_part.append(f"{y: 3d} {years}") 2136 if d > 0 or show_zeros_in_spoken_time: 2137 spoken_time_part.append(f"{d: 4d} {days}") 2138 if h > 0 or show_zeros_in_spoken_time: 2139 spoken_time_part.append(f"{h: 2d} {hours}") 2140 if m > 0 or show_zeros_in_spoken_time: 2141 spoken_time_part.append(f"{m: 2d} {minutes}") 2142 if s > 0 or show_zeros_in_spoken_time: 2143 spoken_time_part.append(f"{s: 2d} {seconds}") 2144 if ms > 0 or show_zeros_in_spoken_time: 2145 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 2146 if us > 0 or show_zeros_in_spoken_time: 2147 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 2148 if ns > 0 or show_zeros_in_spoken_time: 2149 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 2150 return time_lapsed, spoken_time_separator.join(spoken_time_part)
REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 Convert NanoSeconds to Human Readable Time Format. A NanoSeconds is a unit of time in the International System of Units (SI) equal to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. Its symbol is μs, sometimes simplified to us when Unicode is not available. A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond.
INPUT : ms (AKA: MilliSeconds) OUTPUT: tuple(string time_lapsed, string spoken_time) like format. OUTPUT Variables: time_lapsed, spoken_time
Example Input: duration_from_nanoseconds(ns) "Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds" Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') duration_from_nanoseconds(1234567890123456789012)
2152 @staticmethod 2153 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 2154 """ 2155 Convert a specific day, month, and year into a timestamp. 2156 2157 Parameters: 2158 day (int): The day of the month. 2159 month (int): The month of the year. Default is 6 (June). 2160 year (int): The year. Default is 2024. 2161 2162 Returns: 2163 int: The timestamp representing the given day, month, and year. 2164 2165 Note: 2166 This method assumes the default month and year if not provided. 2167 """ 2168 return ZakatTracker.time(datetime.datetime(year, month, day))
Convert a specific day, month, and year into a timestamp.
Parameters: day (int): The day of the month. month (int): The month of the year. Default is 6 (June). year (int): The year. Default is 2024.
Returns: int: The timestamp representing the given day, month, and year.
Note: This method assumes the default month and year if not provided.
2170 @staticmethod 2171 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 2172 """ 2173 Generate a random date between two given dates. 2174 2175 Parameters: 2176 start_date (datetime.datetime): The start date from which to generate a random date. 2177 end_date (datetime.datetime): The end date until which to generate a random date. 2178 2179 Returns: 2180 datetime.datetime: A random date between the start_date and end_date. 2181 """ 2182 time_between_dates = end_date - start_date 2183 days_between_dates = time_between_dates.days 2184 random_number_of_days = random.randrange(days_between_dates) 2185 return start_date + datetime.timedelta(days=random_number_of_days)
Generate a random date between two given dates.
Parameters: start_date (datetime.datetime): The start date from which to generate a random date. end_date (datetime.datetime): The end date until which to generate a random date.
Returns: datetime.datetime: A random date between the start_date and end_date.
2187 @staticmethod 2188 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 2189 debug: bool = False) -> int: 2190 """ 2191 Generate a random CSV file with specified parameters. 2192 2193 Parameters: 2194 path (str): The path where the CSV file will be saved. Default is "data.csv". 2195 count (int): The number of rows to generate in the CSV file. Default is 1000. 2196 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 2197 debug (bool): A flag indicating whether to print debug information. 2198 2199 Returns: 2200 None. The function generates a CSV file at the specified path with the given count of rows. 2201 Each row contains a randomly generated account, description, value, and date. 2202 The value is randomly generated between 1000 and 100000, 2203 and the date is randomly generated between 1950-01-01 and 2023-12-31. 2204 If the row number is not divisible by 13, the value is multiplied by -1. 2205 """ 2206 if debug: 2207 print('generate_random_csv_file', f'debug={debug}') 2208 i = 0 2209 with open(path, "w", newline="") as csvfile: 2210 writer = csv.writer(csvfile) 2211 for i in range(count): 2212 account = f"acc-{random.randint(1, 1000)}" 2213 desc = f"Some text {random.randint(1, 1000)}" 2214 value = random.randint(1000, 100000) 2215 date = ZakatTracker.generate_random_date(datetime.datetime(1000, 1, 1), 2216 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 2217 if not i % 13 == 0: 2218 value *= -1 2219 row = [account, desc, value, date] 2220 if with_rate: 2221 rate = random.randint(1, 100) * 0.12 2222 if debug: 2223 print('before-append', row) 2224 row.append(rate) 2225 if debug: 2226 print('after-append', row) 2227 writer.writerow(row) 2228 i = i + 1 2229 return i
Generate a random CSV file with specified parameters.
Parameters: path (str): The path where the CSV file will be saved. Default is "data.csv". count (int): The number of rows to generate in the CSV file. Default is 1000. with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. debug (bool): A flag indicating whether to print debug information.
Returns: None. The function generates a CSV file at the specified path with the given count of rows. Each row contains a randomly generated account, description, value, and date. The value is randomly generated between 1000 and 100000, and the date is randomly generated between 1950-01-01 and 2023-12-31. If the row number is not divisible by 13, the value is multiplied by -1.
2231 @staticmethod 2232 def create_random_list(max_sum, min_value=0, max_value=10): 2233 """ 2234 Creates a list of random integers whose sum does not exceed the specified maximum. 2235 2236 Args: 2237 max_sum: The maximum allowed sum of the list elements. 2238 min_value: The minimum possible value for an element (inclusive). 2239 max_value: The maximum possible value for an element (inclusive). 2240 2241 Returns: 2242 A list of random integers. 2243 """ 2244 result = [] 2245 current_sum = 0 2246 2247 while current_sum < max_sum: 2248 # Calculate the remaining space for the next element 2249 remaining_sum = max_sum - current_sum 2250 # Determine the maximum possible value for the next element 2251 next_max_value = min(remaining_sum, max_value) 2252 # Generate a random element within the allowed range 2253 next_element = random.randint(min_value, next_max_value) 2254 result.append(next_element) 2255 current_sum += next_element 2256 2257 return result
Creates a list of random integers whose sum does not exceed the specified maximum.
Args: max_sum: The maximum allowed sum of the list elements. min_value: The minimum possible value for an element (inclusive). max_value: The maximum possible value for an element (inclusive).
Returns: A list of random integers.
2488 def test(self, debug: bool = False) -> bool: 2489 if debug: 2490 print('test', f'debug={debug}') 2491 try: 2492 2493 self._test_core(True, debug) 2494 self._test_core(False, debug) 2495 2496 assert self._history() 2497 2498 # Not allowed for duplicate transactions in the same account and time 2499 2500 created = ZakatTracker.time() 2501 self.track(100, 'test-1', 'same', True, created) 2502 failed = False 2503 try: 2504 self.track(50, 'test-1', 'same', True, created) 2505 except: 2506 failed = True 2507 assert failed is True 2508 2509 self.reset() 2510 2511 # Same account transfer 2512 for x in [1, 'a', True, 1.8, None]: 2513 failed = False 2514 try: 2515 self.transfer(1, x, x, 'same-account', debug=debug) 2516 except: 2517 failed = True 2518 assert failed is True 2519 2520 # Always preserve box age during transfer 2521 2522 series: list[tuple] = [ 2523 (30, 4), 2524 (60, 3), 2525 (90, 2), 2526 ] 2527 case = { 2528 3000: { 2529 'series': series, 2530 'rest': 15000, 2531 }, 2532 6000: { 2533 'series': series, 2534 'rest': 12000, 2535 }, 2536 9000: { 2537 'series': series, 2538 'rest': 9000, 2539 }, 2540 18000: { 2541 'series': series, 2542 'rest': 0, 2543 }, 2544 27000: { 2545 'series': series, 2546 'rest': -9000, 2547 }, 2548 36000: { 2549 'series': series, 2550 'rest': -18000, 2551 }, 2552 } 2553 2554 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2555 2556 for total in case: 2557 if debug: 2558 print('--------------------------------------------------------') 2559 print(f'case[{total}]', case[total]) 2560 for x in case[total]['series']: 2561 self.track( 2562 unscaled_value=x[0], 2563 desc=f"test-{x} ages", 2564 account='ages', 2565 logging=True, 2566 created=selected_time * x[1], 2567 ) 2568 2569 unscaled_total = self.unscale(total) 2570 if debug: 2571 print('unscaled_total', unscaled_total) 2572 refs = self.transfer( 2573 unscaled_amount=unscaled_total, 2574 from_account='ages', 2575 to_account='future', 2576 desc='Zakat Movement', 2577 debug=debug, 2578 ) 2579 2580 if debug: 2581 print('refs', refs) 2582 2583 ages_cache_balance = self.balance('ages') 2584 ages_fresh_balance = self.balance('ages', False) 2585 rest = case[total]['rest'] 2586 if debug: 2587 print('source', ages_cache_balance, ages_fresh_balance, rest) 2588 assert ages_cache_balance == rest 2589 assert ages_fresh_balance == rest 2590 2591 future_cache_balance = self.balance('future') 2592 future_fresh_balance = self.balance('future', False) 2593 if debug: 2594 print('target', future_cache_balance, future_fresh_balance, total) 2595 print('refs', refs) 2596 assert future_cache_balance == total 2597 assert future_fresh_balance == total 2598 2599 # TODO: check boxes times for `ages` should equal box times in `future` 2600 for ref in self._vault['account']['ages']['box']: 2601 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2602 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2603 future_capital = 0 2604 if ref in self._vault['account']['future']['box']: 2605 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2606 future_rest = 0 2607 if ref in self._vault['account']['future']['box']: 2608 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2609 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2610 if debug: 2611 print('================================================================') 2612 print('ages', ages_capital, ages_rest) 2613 print('future', future_capital, future_rest) 2614 if ages_rest == 0: 2615 assert ages_capital == future_capital 2616 elif ages_rest < 0: 2617 assert -ages_capital == future_capital 2618 elif ages_rest > 0: 2619 assert ages_capital == ages_rest + future_capital 2620 self.reset() 2621 assert len(self._vault['history']) == 0 2622 2623 assert self._history() 2624 assert self._history(False) is False 2625 assert self._history() is False 2626 assert self._history(True) 2627 assert self._history() 2628 if debug: 2629 print('####################################################################') 2630 2631 transaction = [ 2632 ( 2633 20, 'wallet', 1, -2000, -2000, -2000, 1, 1, 2634 2000, 2000, 2000, 1, 1, 2635 ), 2636 ( 2637 750, 'wallet', 'safe', -77000, -77000, -77000, 2, 2, 2638 75000, 75000, 75000, 1, 1, 2639 ), 2640 ( 2641 600, 'safe', 'bank', 15000, 15000, 15000, 1, 2, 2642 60000, 60000, 60000, 1, 1, 2643 ), 2644 ] 2645 for z in transaction: 2646 self.lock() 2647 x = z[1] 2648 y = z[2] 2649 self.transfer( 2650 unscaled_amount=z[0], 2651 from_account=x, 2652 to_account=y, 2653 desc='test-transfer', 2654 debug=debug, 2655 ) 2656 zz = self.balance(x) 2657 if debug: 2658 print(zz, z) 2659 assert zz == z[3] 2660 xx = self.accounts()[x] 2661 assert xx == z[3] 2662 assert self.balance(x, False) == z[4] 2663 assert xx == z[4] 2664 2665 s = 0 2666 log = self._vault['account'][x]['log'] 2667 for i in log: 2668 s += log[i]['value'] 2669 if debug: 2670 print('s', s, 'z[5]', z[5]) 2671 assert s == z[5] 2672 2673 assert self.box_size(x) == z[6] 2674 assert self.log_size(x) == z[7] 2675 2676 yy = self.accounts()[y] 2677 assert self.balance(y) == z[8] 2678 assert yy == z[8] 2679 assert self.balance(y, False) == z[9] 2680 assert yy == z[9] 2681 2682 s = 0 2683 log = self._vault['account'][y]['log'] 2684 for i in log: 2685 s += log[i]['value'] 2686 assert s == z[10] 2687 2688 assert self.box_size(y) == z[11] 2689 assert self.log_size(y) == z[12] 2690 assert self.free(self.lock()) 2691 2692 if debug: 2693 pp().pprint(self.check(2.17)) 2694 2695 assert not self.nolock() 2696 history_count = len(self._vault['history']) 2697 if debug: 2698 print('history-count', history_count) 2699 assert history_count == 4 2700 assert not self.free(ZakatTracker.time()) 2701 assert self.free(self.lock()) 2702 assert self.nolock() 2703 assert len(self._vault['history']) == 3 2704 2705 # storage 2706 2707 _path = self.path(f'./zakat_test_db/test.{self.ext()}') 2708 if os.path.exists(_path): 2709 os.remove(_path) 2710 self.save() 2711 assert os.path.getsize(_path) > 0 2712 self.reset() 2713 assert self.recall(False, debug) is False 2714 self.load() 2715 assert self._vault['account'] is not None 2716 2717 # recall 2718 2719 assert self.nolock() 2720 assert len(self._vault['history']) == 3 2721 assert self.recall(False, debug) is True 2722 assert len(self._vault['history']) == 2 2723 assert self.recall(False, debug) is True 2724 assert len(self._vault['history']) == 1 2725 assert self.recall(False, debug) is True 2726 assert len(self._vault['history']) == 0 2727 assert self.recall(False, debug) is False 2728 assert len(self._vault['history']) == 0 2729 2730 # exchange 2731 2732 self.exchange("cash", 25, 3.75, "2024-06-25") 2733 self.exchange("cash", 22, 3.73, "2024-06-22") 2734 self.exchange("cash", 15, 3.69, "2024-06-15") 2735 self.exchange("cash", 10, 3.66) 2736 2737 for i in range(1, 30): 2738 exchange = self.exchange("cash", i) 2739 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2740 if debug: 2741 print(i, rate, description, created) 2742 assert created 2743 if i < 10: 2744 assert rate == 1 2745 assert description is None 2746 elif i == 10: 2747 assert rate == 3.66 2748 assert description is None 2749 elif i < 15: 2750 assert rate == 3.66 2751 assert description is None 2752 elif i == 15: 2753 assert rate == 3.69 2754 assert description is not None 2755 elif i < 22: 2756 assert rate == 3.69 2757 assert description is not None 2758 elif i == 22: 2759 assert rate == 3.73 2760 assert description is not None 2761 elif i >= 25: 2762 assert rate == 3.75 2763 assert description is not None 2764 exchange = self.exchange("bank", i) 2765 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2766 if debug: 2767 print(i, rate, description, created) 2768 assert created 2769 assert rate == 1 2770 assert description is None 2771 2772 assert len(self._vault['exchange']) > 0 2773 assert len(self.exchanges()) > 0 2774 self._vault['exchange'].clear() 2775 assert len(self._vault['exchange']) == 0 2776 assert len(self.exchanges()) == 0 2777 2778 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2779 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2780 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2781 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2782 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2783 2784 for i in [x * 0.12 for x in range(-15, 21)]: 2785 if i <= 0: 2786 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2787 else: 2788 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2789 2790 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2791 for i in range(1, 31): 2792 timestamp_ns = ZakatTracker.day_to_time(i) 2793 exchange = self.exchange("cash", timestamp_ns) 2794 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2795 if debug: 2796 print(i, rate, description, created) 2797 assert created 2798 if i < 10: 2799 assert rate == 1 2800 assert description is None 2801 elif i == 10: 2802 assert rate == 3.66 2803 assert description is None 2804 elif i < 15: 2805 assert rate == 3.66 2806 assert description is None 2807 elif i == 15: 2808 assert rate == 3.69 2809 assert description is not None 2810 elif i < 22: 2811 assert rate == 3.69 2812 assert description is not None 2813 elif i == 22: 2814 assert rate == 3.73 2815 assert description is not None 2816 elif i >= 25: 2817 assert rate == 3.75 2818 assert description is not None 2819 exchange = self.exchange("bank", i) 2820 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2821 if debug: 2822 print(i, rate, description, created) 2823 assert created 2824 assert rate == 1 2825 assert description is None 2826 2827 # csv 2828 2829 csv_count = 1000 2830 2831 for with_rate, path in { 2832 False: 'test-import_csv-no-exchange', 2833 True: 'test-import_csv-with-exchange', 2834 }.items(): 2835 2836 if debug: 2837 print('test_import_csv', with_rate, path) 2838 2839 csv_path = path + '.csv' 2840 if os.path.exists(csv_path): 2841 os.remove(csv_path) 2842 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2843 if debug: 2844 print('generate_random_csv_file', c) 2845 assert c == csv_count 2846 assert os.path.getsize(csv_path) > 0 2847 cache_path = self.import_csv_cache_path() 2848 if os.path.exists(cache_path): 2849 os.remove(cache_path) 2850 self.reset() 2851 (created, found, bad) = self.import_csv(csv_path, debug) 2852 bad_count = len(bad) 2853 assert bad_count > 0 2854 if debug: 2855 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2856 print('bad', bad) 2857 tmp_size = os.path.getsize(cache_path) 2858 assert tmp_size > 0 2859 # TODO: assert created + found + bad_count == csv_count 2860 # TODO: assert created == csv_count 2861 # TODO: assert bad_count == 0 2862 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2863 bad_2_count = len(bad_2) 2864 if debug: 2865 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2866 print('bad', bad) 2867 assert bad_2_count > 0 2868 # TODO: assert tmp_size == os.path.getsize(cache_path) 2869 # TODO: assert created_2 + found_2 + bad_2_count == csv_count 2870 # TODO: assert created == found_2 2871 # TODO: assert bad_count == bad_2_count 2872 # TODO: assert found_2 == csv_count 2873 # TODO: assert bad_2_count == 0 2874 # TODO: assert created_2 == 0 2875 2876 # payment parts 2877 2878 positive_parts = self.build_payment_parts(100, positive_only=True) 2879 assert self.check_payment_parts(positive_parts) != 0 2880 assert self.check_payment_parts(positive_parts) != 0 2881 all_parts = self.build_payment_parts(300, positive_only=False) 2882 assert self.check_payment_parts(all_parts) != 0 2883 assert self.check_payment_parts(all_parts) != 0 2884 if debug: 2885 pp().pprint(positive_parts) 2886 pp().pprint(all_parts) 2887 # dynamic discount 2888 suite = [] 2889 count = 3 2890 for exceed in [False, True]: 2891 case = [] 2892 for parts in [positive_parts, all_parts]: 2893 part = parts.copy() 2894 demand = part['demand'] 2895 if debug: 2896 print(demand, part['total']) 2897 i = 0 2898 z = demand / count 2899 cp = { 2900 'account': {}, 2901 'demand': demand, 2902 'exceed': exceed, 2903 'total': part['total'], 2904 } 2905 j = '' 2906 for x, y in part['account'].items(): 2907 x_exchange = self.exchange(x) 2908 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2909 if exceed and zz <= demand: 2910 i += 1 2911 y['part'] = zz 2912 if debug: 2913 print(exceed, y) 2914 cp['account'][x] = y 2915 case.append(y) 2916 elif not exceed and y['balance'] >= zz: 2917 i += 1 2918 y['part'] = zz 2919 if debug: 2920 print(exceed, y) 2921 cp['account'][x] = y 2922 case.append(y) 2923 j = x 2924 if i >= count: 2925 break 2926 if len(cp['account'][j]) > 0: 2927 suite.append(cp) 2928 if debug: 2929 print('suite', len(suite)) 2930 # vault = self._vault.copy() 2931 for case in suite: 2932 # self._vault = vault.copy() 2933 if debug: 2934 print('case', case) 2935 result = self.check_payment_parts(case) 2936 if debug: 2937 print('check_payment_parts', result, f'exceed: {exceed}') 2938 assert result == 0 2939 2940 report = self.check(2.17, None, debug) 2941 (valid, brief, plan) = report 2942 if debug: 2943 print('valid', valid) 2944 zakat_result = self.zakat(report, parts=case, debug=debug) 2945 if debug: 2946 print('zakat-result', zakat_result) 2947 assert valid == zakat_result 2948 2949 assert self.save(path + f'.{self.ext()}') 2950 assert self.export_json(path + '.json') 2951 2952 assert self.export_json("1000-transactions-test.json") 2953 assert self.save(f"1000-transactions-test.{self.ext()}") 2954 2955 self.reset() 2956 2957 # test transfer between accounts with different exchange rate 2958 2959 a_SAR = "Bank (SAR)" 2960 b_USD = "Bank (USD)" 2961 c_SAR = "Safe (SAR)" 2962 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2963 for case in [ 2964 (0, a_SAR, "SAR Gift", 1000, 100000), 2965 (1, a_SAR, 1), 2966 (0, b_USD, "USD Gift", 500, 50000), 2967 (1, b_USD, 1), 2968 (2, b_USD, 3.75), 2969 (1, b_USD, 3.75), 2970 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 40000, 137500), 2971 (0, c_SAR, "Salary", 750, 75000), 2972 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 37500, 50000), 2973 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 137125, 50100), 2974 ]: 2975 if debug: 2976 print('case', case) 2977 match (case[0]): 2978 case 0: # track 2979 _, account, desc, x, balance = case 2980 self.track(unscaled_value=x, desc=desc, account=account, debug=debug) 2981 2982 cached_value = self.balance(account, cached=True) 2983 fresh_value = self.balance(account, cached=False) 2984 if debug: 2985 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2986 assert cached_value == balance 2987 assert fresh_value == balance 2988 case 1: # check-exchange 2989 _, account, expected_rate = case 2990 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2991 if debug: 2992 print('t-exchange', t_exchange) 2993 assert t_exchange['rate'] == expected_rate 2994 case 2: # do-exchange 2995 _, account, rate = case 2996 self.exchange(account, rate=rate, debug=debug) 2997 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2998 if debug: 2999 print('b-exchange', b_exchange) 3000 assert b_exchange['rate'] == rate 3001 case 3: # transfer 3002 _, x, a, b, desc, a_balance, b_balance = case 3003 self.transfer(x, a, b, desc, debug=debug) 3004 3005 cached_value = self.balance(a, cached=True) 3006 fresh_value = self.balance(a, cached=False) 3007 if debug: 3008 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value, 'a_balance', a_balance) 3009 assert cached_value == a_balance 3010 assert fresh_value == a_balance 3011 3012 cached_value = self.balance(b, cached=True) 3013 fresh_value = self.balance(b, cached=False) 3014 if debug: 3015 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 3016 assert cached_value == b_balance 3017 assert fresh_value == b_balance 3018 3019 # Transfer all in many chunks randomly from B to A 3020 a_SAR_balance = 137125 3021 b_USD_balance = 50100 3022 b_USD_exchange = self.exchange(b_USD) 3023 amounts = ZakatTracker.create_random_list(b_USD_balance, max_value=1000) 3024 if debug: 3025 print('amounts', amounts) 3026 i = 0 3027 for x in amounts: 3028 if debug: 3029 print(f'{i} - transfer-with-exchange({x})') 3030 self.transfer( 3031 unscaled_amount=self.unscale(x), 3032 from_account=b_USD, 3033 to_account=a_SAR, 3034 desc=f"{x} USD -> SAR", 3035 debug=debug, 3036 ) 3037 3038 b_USD_balance -= x 3039 cached_value = self.balance(b_USD, cached=True) 3040 fresh_value = self.balance(b_USD, cached=False) 3041 if debug: 3042 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3043 b_USD_balance) 3044 assert cached_value == b_USD_balance 3045 assert fresh_value == b_USD_balance 3046 3047 a_SAR_balance += int(x * b_USD_exchange['rate']) 3048 cached_value = self.balance(a_SAR, cached=True) 3049 fresh_value = self.balance(a_SAR, cached=False) 3050 if debug: 3051 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3052 a_SAR_balance, 'rate', b_USD_exchange['rate']) 3053 assert cached_value == a_SAR_balance 3054 assert fresh_value == a_SAR_balance 3055 i += 1 3056 3057 # Transfer all in many chunks randomly from C to A 3058 c_SAR_balance = 37500 3059 amounts = ZakatTracker.create_random_list(c_SAR_balance, max_value=1000) 3060 if debug: 3061 print('amounts', amounts) 3062 i = 0 3063 for x in amounts: 3064 if debug: 3065 print(f'{i} - transfer-with-exchange({x})') 3066 self.transfer( 3067 unscaled_amount=self.unscale(x), 3068 from_account=c_SAR, 3069 to_account=a_SAR, 3070 desc=f"{x} SAR -> a_SAR", 3071 debug=debug, 3072 ) 3073 3074 c_SAR_balance -= x 3075 cached_value = self.balance(c_SAR, cached=True) 3076 fresh_value = self.balance(c_SAR, cached=False) 3077 if debug: 3078 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 3079 c_SAR_balance) 3080 assert cached_value == c_SAR_balance 3081 assert fresh_value == c_SAR_balance 3082 3083 a_SAR_balance += x 3084 cached_value = self.balance(a_SAR, cached=True) 3085 fresh_value = self.balance(a_SAR, cached=False) 3086 if debug: 3087 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 3088 a_SAR_balance) 3089 assert cached_value == a_SAR_balance 3090 assert fresh_value == a_SAR_balance 3091 i += 1 3092 3093 assert self.export_json("accounts-transfer-with-exchange-rates.json") 3094 assert self.save(f"accounts-transfer-with-exchange-rates.{self.ext()}") 3095 3096 # check & zakat with exchange rates for many cycles 3097 3098 for rate, values in { 3099 1: { 3100 'in': [1000, 2000, 10000], 3101 'exchanged': [100000, 200000, 1000000], 3102 'out': [2500, 5000, 73140], 3103 }, 3104 3.75: { 3105 'in': [200, 1000, 5000], 3106 'exchanged': [75000, 375000, 1875000], 3107 'out': [1875, 9375, 137138], 3108 }, 3109 }.items(): 3110 a, b, c = values['in'] 3111 m, n, o = values['exchanged'] 3112 x, y, z = values['out'] 3113 if debug: 3114 print('rate', rate, 'values', values) 3115 for case in [ 3116 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3117 {'safe': {0: {'below_nisab': x}}}, 3118 ], False, m), 3119 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 3120 {'safe': {0: {'count': 1, 'total': y}}}, 3121 ], True, n), 3122 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 3123 {'cave': {0: {'count': 3, 'total': z}}}, 3124 ], True, o), 3125 ]: 3126 if debug: 3127 print(f"############# check(rate: {rate}) #############") 3128 print('case', case) 3129 self.reset() 3130 self.exchange(account=case[1], created=case[2], rate=rate) 3131 self.track(unscaled_value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 3132 assert self.snapshot() 3133 3134 # assert self.nolock() 3135 # history_size = len(self._vault['history']) 3136 # print('history_size', history_size) 3137 # assert history_size == 2 3138 assert self.lock() 3139 assert not self.nolock() 3140 report = self.check(2.17, None, debug) 3141 (valid, brief, plan) = report 3142 if debug: 3143 print('brief', brief) 3144 assert valid == case[4] 3145 assert case[5] == brief[0] 3146 assert case[5] == brief[1] 3147 3148 if debug: 3149 pp().pprint(plan) 3150 3151 for x in plan: 3152 assert case[1] == x 3153 if 'total' in case[3][0][x][0].keys(): 3154 assert case[3][0][x][0]['total'] == int(brief[2]) 3155 assert int(plan[x][0]['total']) == case[3][0][x][0]['total'] 3156 assert int(plan[x][0]['count']) == case[3][0][x][0]['count'] 3157 else: 3158 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 3159 if debug: 3160 pp().pprint(report) 3161 result = self.zakat(report, debug=debug) 3162 if debug: 3163 print('zakat-result', result, case[4]) 3164 assert result == case[4] 3165 report = self.check(2.17, None, debug) 3166 (valid, brief, plan) = report 3167 assert valid is False 3168 3169 history_size = len(self._vault['history']) 3170 if debug: 3171 print('history_size', history_size) 3172 assert history_size == 3 3173 assert not self.nolock() 3174 assert self.recall(False, debug) is False 3175 self.free(self.lock()) 3176 assert self.nolock() 3177 3178 for i in range(3, 0, -1): 3179 history_size = len(self._vault['history']) 3180 if debug: 3181 print('history_size', history_size) 3182 assert history_size == i 3183 assert self.recall(False, debug) is True 3184 3185 assert self.nolock() 3186 assert self.recall(False, debug) is False 3187 3188 history_size = len(self._vault['history']) 3189 if debug: 3190 print('history_size', history_size) 3191 assert history_size == 0 3192 3193 account_size = len(self._vault['account']) 3194 if debug: 3195 print('account_size', account_size) 3196 assert account_size == 0 3197 3198 report_size = len(self._vault['report']) 3199 if debug: 3200 print('report_size', report_size) 3201 assert report_size == 0 3202 3203 assert self.nolock() 3204 return True 3205 except Exception as e: 3206 # pp().pprint(self._vault) 3207 assert self.export_json("test-snapshot.json") 3208 assert self.save(f"test-snapshot.{self.ext()}") 3209 raise e
3212def test(debug: bool = False): 3213 ledger = ZakatTracker("./zakat_test_db/zakat.camel") 3214 start = ZakatTracker.time() 3215 assert ledger.test(debug=debug) 3216 if debug: 3217 print("#########################") 3218 print("######## TEST DONE ########") 3219 print("#########################") 3220 print(ZakatTracker.duration_from_nanoseconds(ZakatTracker.time() - start)) 3221 print("#########################")
76class Action(Enum): 77 CREATE = auto() 78 TRACK = auto() 79 LOG = auto() 80 SUB = auto() 81 ADD_FILE = auto() 82 REMOVE_FILE = auto() 83 BOX_TRANSFER = auto() 84 EXCHANGE = auto() 85 REPORT = auto() 86 ZAKAT = auto()
89class JSONEncoder(json.JSONEncoder): 90 def default(self, obj): 91 if isinstance(obj, Action) or isinstance(obj, MathOperation): 92 return obj.name # Serialize as the enum member's name 93 elif isinstance(obj, Decimal): 94 return float(obj) 95 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
90 def default(self, obj): 91 if isinstance(obj, Action) or isinstance(obj, MathOperation): 92 return obj.name # Serialize as the enum member's name 93 elif isinstance(obj, Decimal): 94 return float(obj) 95 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
55def start_file_server(database_path: str, database_callback: callable = None, csv_callback: callable = None, 56 debug: bool = False) -> tuple: 57 """ 58 Starts a multi-purpose HTTP server to manage file interactions for a Zakat application. 59 60 This server facilitates the following functionalities: 61 62 1. GET /{file_uuid}/get: Download the database file specified by `database_path`. 63 2. GET /{file_uuid}/upload: Display an HTML form for uploading files. 64 3. POST /{file_uuid}/upload: Handle file uploads, distinguishing between: 65 - Database File (.db): Replaces the existing database with the uploaded one. 66 - CSV File (.csv): Imports data from the CSV into the existing database. 67 68 Args: 69 database_path (str): The path to the pickle database file. 70 database_callback (callable, optional): A function to call after a successful database upload. 71 It receives the uploaded database path as its argument. 72 csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, 73 the database path, and the debug flag as its arguments. 74 debug (bool, optional): If True, print debugging information. Defaults to False. 75 76 Returns: 77 Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: 78 - file_name (str): The name of the database file. 79 - download_url (str): The URL to download the database file. 80 - upload_url (str): The URL to access the file upload form. 81 - server_thread (threading.Thread): The thread running the server. 82 - shutdown_server (Callable[[], None]): A function to gracefully shut down the server. 83 84 Example: 85 _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") 86 print(f"Download database: {download_url}") 87 print(f"Upload files: {upload_url}") 88 server_thread.start() 89 # ... later ... 90 shutdown_server() 91 """ 92 file_uuid = uuid.uuid4() 93 file_name = os.path.basename(database_path) 94 95 port = find_available_port() 96 download_url = f"http://localhost:{port}/{file_uuid}/get" 97 upload_url = f"http://localhost:{port}/{file_uuid}/upload" 98 99 class Handler(http.server.SimpleHTTPRequestHandler): 100 def do_GET(self): 101 if self.path == f"/{file_uuid}/get": 102 # GET: Serve the existing file 103 try: 104 with open(database_path, "rb") as f: 105 self.send_response(200) 106 self.send_header("Content-type", "application/octet-stream") 107 self.send_header("Content-Disposition", f'attachment; filename="{file_name}"') 108 self.end_headers() 109 self.wfile.write(f.read()) 110 except FileNotFoundError: 111 self.send_error(404, "File not found") 112 elif self.path == f"/{file_uuid}/upload": 113 # GET: Serve the upload form 114 self.send_response(200) 115 self.send_header("Content-type", "text/html") 116 self.end_headers() 117 self.wfile.write(f""" 118 <html lang="en"> 119 <head> 120 <title>Zakat File Server</title> 121 </head> 122 <body> 123 <h1>Zakat File Server</h1> 124 <h3>You can download the <a target="__blank" href="{download_url}">database file</a>...</h3> 125 <h3>Or upload a new file to restore a database or import `CSV` file:</h3> 126 <form action="/{file_uuid}/upload" method="post" enctype="multipart/form-data"> 127 <input type="file" name="file" required><br/> 128 <input type="radio" id="{FileType.Database.value}" name="upload_type" value="{FileType.Database.value}" required> 129 <label for="database">Database File</label><br/> 130 <input type="radio"id="{FileType.CSV.value}" name="upload_type" value="{FileType.CSV.value}"> 131 <label for="csv">CSV File</label><br/> 132 <input type="submit" value="Upload"><br/> 133 </form> 134 </body></html> 135 """.encode()) 136 else: 137 self.send_error(404) 138 139 def do_POST(self): 140 if self.path == f"/{file_uuid}/upload": 141 # POST: Handle request 142 # 1. Get the Form Data 143 form_data = cgi.FieldStorage( 144 fp=self.rfile, 145 headers=self.headers, 146 environ={'REQUEST_METHOD': 'POST'} 147 ) 148 upload_type = form_data.getvalue("upload_type") 149 150 if debug: 151 print('upload_type', upload_type) 152 153 if upload_type not in [FileType.Database.value, FileType.CSV.value]: 154 self.send_error(400, "Invalid upload type") 155 return 156 157 # 2. Extract File Data 158 file_item = form_data['file'] # Assuming 'file' is your file input name 159 160 # 3. Get File Details 161 filename = file_item.filename 162 file_data = file_item.file.read() # Read the file's content 163 164 if debug: 165 print(f'Uploaded filename: {filename}') 166 167 # 4. Define Storage Path for CSV 168 upload_directory = "./uploads" # Create this directory if it doesn't exist 169 os.makedirs(upload_directory, exist_ok=True) 170 file_path = os.path.join(upload_directory, upload_type) 171 172 # 5. Write to Disk 173 with open(file_path, 'wb') as f: 174 f.write(file_data) 175 176 match upload_type: 177 case FileType.Database.value: 178 179 try: 180 # 6. Verify database file 181 # ZakatTracker(db_path=file_path) # FATAL, Circular Imports Error 182 if database_callback is not None: 183 database_callback(file_path) 184 185 # 7. Copy database into the original path 186 shutil.copy2(file_path, database_path) 187 except Exception as e: 188 self.send_error(400, str(e)) 189 return 190 191 case FileType.CSV.value: 192 # 6. Verify CSV file 193 try: 194 # x = ZakatTracker(db_path=database_path) # FATAL, Circular Imports Error 195 # result = x.import_csv(file_path, debug=debug) 196 if csv_callback is not None: 197 result = csv_callback(file_path, database_path, debug) 198 if debug: 199 print(f'CSV imported: {result}') 200 if len(result[2]) != 0: 201 self.send_response(200) 202 self.end_headers() 203 self.wfile.write(json.dumps(result).encode()) 204 return 205 except Exception as e: 206 self.send_error(400, str(e)) 207 return 208 209 self.send_response(200) 210 self.end_headers() 211 self.wfile.write(b"File uploaded successfully.") 212 213 httpd = socketserver.TCPServer(("localhost", port), Handler) 214 server_thread = threading.Thread(target=httpd.serve_forever) 215 216 def shutdown_server(): 217 nonlocal httpd, server_thread 218 httpd.shutdown() 219 httpd.server_close() # Close the socket 220 server_thread.join() # Wait for the thread to finish 221 222 return file_name, download_url, upload_url, server_thread, shutdown_server
Starts a multi-purpose HTTP server to manage file interactions for a Zakat application.
This server facilitates the following functionalities:
- GET /{file_uuid}/get: Download the database file specified by
database_path
. - GET /{file_uuid}/upload: Display an HTML form for uploading files.
- POST /{file_uuid}/upload: Handle file uploads, distinguishing between:
- Database File (.db): Replaces the existing database with the uploaded one.
- CSV File (.csv): Imports data from the CSV into the existing database.
Args: database_path (str): The path to the pickle database file. database_callback (callable, optional): A function to call after a successful database upload. It receives the uploaded database path as its argument. csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, the database path, and the debug flag as its arguments. debug (bool, optional): If True, print debugging information. Defaults to False.
Returns: Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: - file_name (str): The name of the database file. - download_url (str): The URL to download the database file. - upload_url (str): The URL to access the file upload form. - server_thread (threading.Thread): The thread running the server. - shutdown_server (Callable[[], None]): A function to gracefully shut down the server.
Example: _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") print(f"Download database: {download_url}") print(f"Upload files: {upload_url}") server_thread.start() # ... later ... shutdown_server()
32def find_available_port() -> int: 33 """ 34 Finds and returns an available TCP port on the local machine. 35 36 This function utilizes a TCP server socket to bind to port 0, which 37 instructs the operating system to automatically assign an available 38 port. The assigned port is then extracted and returned. 39 40 Returns: 41 int: The available TCP port number. 42 43 Raises: 44 OSError: If an error occurs during the port binding process, such 45 as all ports being in use. 46 47 Example: 48 port = find_available_port() 49 print(f"Available port: {port}") 50 """ 51 with socketserver.TCPServer(("localhost", 0), None) as s: 52 return s.server_address[1]
Finds and returns an available TCP port on the local machine.
This function utilizes a TCP server socket to bind to port 0, which instructs the operating system to automatically assign an available port. The assigned port is then extracted and returned.
Returns: int: The available TCP port number.
Raises: OSError: If an error occurs during the port binding process, such as all ports being in use.
Example: port = find_available_port() print(f"Available port: {port}")