zakat
xxx
_____ _ _ _ _ _
|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _
/ // _| |/ / _
| __| | | | | '_ \| '__/ _` | '__| | | |
/ /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| |
/______,_|_|___,_|__| |_____|_|_.__/|_| __,_|_| __, |
|___/
"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف ... Never Trust, Always Verify ...
This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat.
1""" 2 _____ _ _ _ _ _ 3|__ /__ _| | ____ _| |_ | | (_) |__ _ __ __ _ _ __ _ _ 4 / // _` | |/ / _` | __| | | | | '_ \| '__/ _` | '__| | | | 5 / /| (_| | < (_| | |_ | |___| | |_) | | | (_| | | | |_| | 6/____\__,_|_|\_\__,_|\__| |_____|_|_.__/|_| \__,_|_| \__, | 7 |___/ 8 9"رَبَّنَا افْتَحْ بَيْنَنَا وَبَيْنَ قَوْمِنَا بِالْحَقِّ وَأَنتَ خَيْرُ الْفَاتِحِينَ (89)" -- سورة الأعراف 10... Never Trust, Always Verify ... 11 12This file provides the ZakatLibrary classes, functions for tracking and calculating Zakat. 13""" 14# Importing necessary classes and functions from the main module 15from zakat.zakat_tracker import ( 16 ZakatTracker, 17 test, 18 Action, 19 JSONEncoder, 20 MathOperation, 21) 22 23from zakat.file_server import ( 24 start_file_server, 25 find_available_port, 26 FileType, 27) 28 29# Version information for the module 30__version__ = ZakatTracker.Version() 31__all__ = [ 32 "ZakatTracker", 33 "test", 34 "Action", 35 "JSONEncoder", 36 "MathOperation", 37 "start_file_server", 38 "find_available_port", 39 "FileType", 40]
101class ZakatTracker: 102 """ 103 A class for tracking and calculating Zakat. 104 105 This class provides functionalities for recording transactions, calculating Zakat due, 106 and managing account balances. It also offers features like importing transactions from 107 CSV files, exporting data to JSON format, and saving/loading the tracker state. 108 109 The `ZakatTracker` class is designed to handle both positive and negative transactions, 110 allowing for flexible tracking of financial activities related to Zakat. It also supports 111 the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due 112 based on the current silver price. 113 114 The class uses a pickle file as its database to persist the tracker state, 115 ensuring data integrity across sessions. It also provides options for enabling or 116 disabling history tracking, allowing users to choose their preferred level of detail. 117 118 In addition, the `ZakatTracker` class includes various helper methods like 119 `time`, `time_to_datetime`, `lock`, `free`, `recall`, `export_json`, 120 and more. These methods provide additional functionalities and flexibility 121 for interacting with and managing the Zakat tracker. 122 123 Attributes: 124 ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. 125 ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. 126 ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. 127 ZakatTracker.Version (function): The version of the ZakatTracker class. 128 129 Data Structure: 130 The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data. 131 132 _vault (dict): 133 - account (dict): 134 - {account_number} (dict): 135 - balance (int): The current balance of the account. 136 - box (dict): A dictionary storing transaction details. 137 - {timestamp} (dict): 138 - capital (int): The initial amount of the transaction. 139 - count (int): The number of times Zakat has been calculated for this transaction. 140 - last (int): The timestamp of the last Zakat calculation. 141 - rest (int): The remaining amount after Zakat deductions and withdrawal. 142 - total (int): The total Zakat deducted from this transaction. 143 - count (int): The total number of transactions for the account. 144 - log (dict): A dictionary storing transaction logs. 145 - {timestamp} (dict): 146 - value (int): The transaction amount (positive or negative). 147 - desc (str): The description of the transaction. 148 - ref (int): The box reference (positive or None). 149 - file (dict): A dictionary storing file references associated with the transaction. 150 - hide (bool): Indicates whether the account is hidden or not. 151 - zakatable (bool): Indicates whether the account is subject to Zakat. 152 - exchange (dict): 153 - account (dict): 154 - {timestamps} (dict): 155 - rate (float): Exchange rate when compared to local currency. 156 - description (str): The description of the exchange rate. 157 - history (dict): 158 - {timestamp} (list): A list of dictionaries storing the history of actions performed. 159 - {action_dict} (dict): 160 - action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT). 161 - account (str): The account number associated with the action. 162 - ref (int): The reference number of the transaction. 163 - file (int): The reference number of the file (if applicable). 164 - key (str): The key associated with the action (e.g., 'rest', 'total'). 165 - value (int): The value associated with the action. 166 - math (MathOperation): The mathematical operation performed (if applicable). 167 - lock (int or None): The timestamp indicating the current lock status (None if not locked). 168 - report (dict): 169 - {timestamp} (tuple): A tuple storing Zakat report details. 170 171 """ 172 173 @staticmethod 174 def Version(): 175 """ 176 Returns the current version of the software. 177 178 This function returns a string representing the current version of the software, 179 including major, minor, and patch version numbers in the format "X.Y.Z". 180 181 Returns: 182 str: The current version of the software. 183 """ 184 return '0.2.76' 185 186 @staticmethod 187 def ZakatCut(x: float) -> float: 188 """ 189 Calculates the Zakat amount due on an asset. 190 191 This function calculates the zakat amount due on a given asset value over one lunar year. 192 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 193 that exceeds a certain threshold (Nisab). 194 195 Parameters: 196 x: The total value of the asset on which Zakat is to be calculated. 197 198 Returns: 199 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 200 """ 201 return 0.025 * x # Zakat Cut in one Lunar Year 202 203 @staticmethod 204 def TimeCycle(days: int = 355) -> int: 205 """ 206 Calculates the approximate duration of a lunar year in nanoseconds. 207 208 This function calculates the approximate duration of a lunar year based on the given number of days. 209 It converts the given number of days into nanoseconds for use in high-precision timing applications. 210 211 Parameters: 212 days: The number of days in a lunar year. Defaults to 355, 213 which is an approximation of the average length of a lunar year. 214 215 Returns: 216 The approximate duration of a lunar year in nanoseconds. 217 """ 218 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds 219 220 @staticmethod 221 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 222 """ 223 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 224 225 This function calculates the Nisab value, which is the minimum threshold of wealth, 226 that makes an individual liable for paying Zakat. 227 The Nisab value is determined by the equivalent value of a specific amount 228 of gold or silver (currently 595 grams in silver) in the local currency. 229 230 Parameters: 231 - gram_price (float): The price per gram of Nisab. 232 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 233 234 Returns: 235 - float: The total value of Nisab based on the given price per gram. 236 """ 237 return gram_price * gram_quantity 238 239 def __init__(self, db_path: str = "zakat.pickle", history_mode: bool = True): 240 """ 241 Initialize ZakatTracker with database path and history mode. 242 243 Parameters: 244 db_path (str): The path to the database file. Default is "zakat.pickle". 245 history_mode (bool): The mode for tracking history. Default is True. 246 247 Returns: 248 None 249 """ 250 self._vault_path = None 251 self._vault = None 252 self.reset() 253 self._history(history_mode) 254 self.path(db_path) 255 self.load() 256 257 def path(self, path: str = None) -> str: 258 """ 259 Set or get the database path. 260 261 Parameters: 262 path (str): The path to the database file. If not provided, it returns the current path. 263 264 Returns: 265 str: The current database path. 266 """ 267 if path is not None: 268 self._vault_path = path 269 return self._vault_path 270 271 @staticmethod 272 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 273 """ 274 Scales a numerical value by a specified power of 10, returning an integer. 275 276 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 277 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 278 279 Parameters: 280 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 281 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 282 by a factor of 100 (e.g., converts 1.23 to 123). 283 284 Returns: 285 The scaled value, rounded to the nearest integer. 286 287 Raises: 288 TypeError: If the input `x` is not a valid numeric type. 289 290 Examples: 291 >>> scale(3.14159) 292 314 293 >>> scale(1234, decimal_places=3) 294 1234000 295 >>> scale(Decimal("0.005"), decimal_places=4) 296 50 297 """ 298 if not isinstance(x, (float, int, Decimal)): 299 raise TypeError("Input 'x' must be a float, int, or Decimal.") 300 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places)) 301 302 @staticmethod 303 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 304 """ 305 Unscales an integer by a power of 10. 306 307 Parameters: 308 x: The integer to unscale. 309 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 310 decimal_places: The power of 10 to use. Defaults to 2. 311 312 Returns: 313 The unscaled number, converted to the specified return_type. 314 315 Raises: 316 TypeError: If the return_type is not float or Decimal. 317 """ 318 if return_type not in (float, Decimal): 319 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 320 return round(return_type(x / (10 ** decimal_places)), decimal_places) 321 322 def _history(self, status: bool = None) -> bool: 323 """ 324 Enable or disable history tracking. 325 326 Parameters: 327 status (bool): The status of history tracking. Default is True. 328 329 Returns: 330 None 331 """ 332 if status is not None: 333 self._history_mode = status 334 return self._history_mode 335 336 def reset(self) -> None: 337 """ 338 Reset the internal data structure to its initial state. 339 340 Parameters: 341 None 342 343 Returns: 344 None 345 """ 346 self._vault = { 347 'account': {}, 348 'exchange': {}, 349 'history': {}, 350 'lock': None, 351 'report': {}, 352 } 353 354 @staticmethod 355 def time(now: datetime = None) -> int: 356 """ 357 Generates a timestamp based on the provided datetime object or the current datetime. 358 359 Parameters: 360 now (datetime, optional): The datetime object to generate the timestamp from. 361 If not provided, the current datetime is used. 362 363 Returns: 364 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 365 before 1970 will return in negative until 1000AD. 366 """ 367 if now is None: 368 now = datetime.datetime.now() 369 ordinal_day = now.toordinal() 370 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 371 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day) 372 373 @staticmethod 374 def time_to_datetime(ordinal_ns: int) -> datetime: 375 """ 376 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 377 378 Parameters: 379 ordinal_ns (int): The ordinal number of days since 1000-01-01. 380 381 Returns: 382 datetime: The corresponding datetime object. 383 """ 384 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 385 ns_in_day = ordinal_ns % 86_400_000_000_000 386 d = datetime.datetime.fromordinal(ordinal_day) 387 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 388 return datetime.datetime.combine(d, datetime.time()) + t 389 390 def clean_history(self, lock: int | None = None) -> int: 391 """ 392 Cleans up the history of actions performed on the ZakatTracker instance. 393 394 Parameters: 395 lock (int, optional): The lock ID is used to clean up the empty history. 396 If not provided, it cleans up the empty history records for all locks. 397 398 Returns: 399 int: The number of locks cleaned up. 400 """ 401 count = 0 402 if lock in self._vault['history']: 403 if len(self._vault['history'][lock]) <= 0: 404 count += 1 405 del self._vault['history'][lock] 406 return count 407 self.free(self.lock()) 408 for lock in self._vault['history']: 409 if len(self._vault['history'][lock]) <= 0: 410 count += 1 411 del self._vault['history'][lock] 412 return count 413 414 def _step(self, action: Action = None, account=None, ref: int = None, file: int = None, value: float = None, 415 key: str = None, math_operation: MathOperation = None) -> int: 416 """ 417 This method is responsible for recording the actions performed on the ZakatTracker. 418 419 Parameters: 420 - action (Action): The type of action performed. 421 - account (str): The account number on which the action was performed. 422 - ref (int): The reference number of the action. 423 - file (int): The file reference number of the action. 424 - value (int): The value associated with the action. 425 - key (str): The key associated with the action. 426 - math_operation (MathOperation): The mathematical operation performed during the action. 427 428 Returns: 429 - int: The lock time of the recorded action. If no lock was performed, it returns 0. 430 """ 431 if not self._history(): 432 return 0 433 lock = self._vault['lock'] 434 if self.nolock(): 435 lock = self._vault['lock'] = self.time() 436 self._vault['history'][lock] = [] 437 if action is None: 438 return lock 439 self._vault['history'][lock].append({ 440 'action': action, 441 'account': account, 442 'ref': ref, 443 'file': file, 444 'key': key, 445 'value': value, 446 'math': math_operation, 447 }) 448 return lock 449 450 def nolock(self) -> bool: 451 """ 452 Check if the vault lock is currently not set. 453 454 Returns: 455 bool: True if the vault lock is not set, False otherwise. 456 """ 457 return self._vault['lock'] is None 458 459 def lock(self) -> int: 460 """ 461 Acquires a lock on the ZakatTracker instance. 462 463 Returns: 464 int: The lock ID. This ID can be used to release the lock later. 465 """ 466 return self._step() 467 468 def vault(self) -> dict: 469 """ 470 Returns a copy of the internal vault dictionary. 471 472 This method is used to retrieve the current state of the ZakatTracker object. 473 It provides a snapshot of the internal data structure, allowing for further 474 processing or analysis. 475 476 Returns: 477 dict: A copy of the internal vault dictionary. 478 """ 479 return self._vault.copy() 480 481 def stats(self) -> dict[str, tuple]: 482 """ 483 Calculates and returns statistics about the object's data storage. 484 485 This method determines the size of the database file on disk and the 486 size of the data currently held in RAM (likely within a dictionary). 487 Both sizes are reported in bytes and in a human-readable format 488 (e.g., KB, MB). 489 490 Returns: 491 dict[str, tuple]: A dictionary containing the following statistics: 492 493 * 'database': A tuple with two elements: 494 - The database file size in bytes (int). 495 - The database file size in human-readable format (str). 496 * 'ram': A tuple with two elements: 497 - The RAM usage (dictionary size) in bytes (int). 498 - The RAM usage in human-readable format (str). 499 500 Example: 501 >>> stats = my_object.stats() 502 >>> print(stats['database']) 503 (256000, '250.0 KB') 504 >>> print(stats['ram']) 505 (12345, '12.1 KB') 506 """ 507 ram_size = self.get_dict_size(self.vault()) 508 file_size = os.path.getsize(self.path()) 509 return { 510 'database': (file_size, self.human_readable_size(file_size)), 511 'ram': (ram_size, self.human_readable_size(ram_size)), 512 } 513 514 def steps(self) -> dict: 515 """ 516 Returns a copy of the history of steps taken in the ZakatTracker. 517 518 The history is a dictionary where each key is a unique identifier for a step, 519 and the corresponding value is a dictionary containing information about the step. 520 521 Returns: 522 dict: A copy of the history of steps taken in the ZakatTracker. 523 """ 524 return self._vault['history'].copy() 525 526 def free(self, lock: int, auto_save: bool = True) -> bool: 527 """ 528 Releases the lock on the database. 529 530 Parameters: 531 lock (int): The lock ID to be released. 532 auto_save (bool): Whether to automatically save the database after releasing the lock. 533 534 Returns: 535 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 536 """ 537 if lock == self._vault['lock']: 538 self._vault['lock'] = None 539 self.clean_history(lock) 540 if auto_save: 541 return self.save(self.path()) 542 return True 543 return False 544 545 def account_exists(self, account) -> bool: 546 """ 547 Check if the given account exists in the vault. 548 549 Parameters: 550 account (str): The account number to check. 551 552 Returns: 553 bool: True if the account exists, False otherwise. 554 """ 555 return account in self._vault['account'] 556 557 def box_size(self, account) -> int: 558 """ 559 Calculate the size of the box for a specific account. 560 561 Parameters: 562 account (str): The account number for which the box size needs to be calculated. 563 564 Returns: 565 int: The size of the box for the given account. If the account does not exist, -1 is returned. 566 """ 567 if self.account_exists(account): 568 return len(self._vault['account'][account]['box']) 569 return -1 570 571 def log_size(self, account) -> int: 572 """ 573 Get the size of the log for a specific account. 574 575 Parameters: 576 account (str): The account number for which the log size needs to be calculated. 577 578 Returns: 579 int: The size of the log for the given account. If the account does not exist, -1 is returned. 580 """ 581 if self.account_exists(account): 582 return len(self._vault['account'][account]['log']) 583 return -1 584 585 def recall(self, dry=True, debug=False) -> bool: 586 """ 587 Revert the last operation. 588 589 Parameters: 590 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 591 debug (bool): If True, the function will print debug information. Default is False. 592 593 Returns: 594 bool: True if the operation was successful, False otherwise. 595 """ 596 if not self.nolock() or len(self._vault['history']) == 0: 597 return False 598 if len(self._vault['history']) <= 0: 599 return False 600 ref = sorted(self._vault['history'].keys())[-1] 601 if debug: 602 print('recall', ref) 603 memory = self._vault['history'][ref] 604 if debug: 605 print(type(memory), 'memory', memory) 606 607 limit = len(memory) + 1 608 sub_positive_log_negative = 0 609 for i in range(-1, -limit, -1): 610 x = memory[i] 611 if debug: 612 print(type(x), x) 613 match x['action']: 614 case Action.CREATE: 615 if x['account'] is not None: 616 if self.account_exists(x['account']): 617 if debug: 618 print('account', self._vault['account'][x['account']]) 619 assert len(self._vault['account'][x['account']]['box']) == 0 620 assert self._vault['account'][x['account']]['balance'] == 0 621 assert self._vault['account'][x['account']]['count'] == 0 622 if dry: 623 continue 624 del self._vault['account'][x['account']] 625 626 case Action.TRACK: 627 if x['account'] is not None: 628 if self.account_exists(x['account']): 629 if dry: 630 continue 631 self._vault['account'][x['account']]['balance'] -= x['value'] 632 self._vault['account'][x['account']]['count'] -= 1 633 del self._vault['account'][x['account']]['box'][x['ref']] 634 635 case Action.LOG: 636 if x['account'] is not None: 637 if self.account_exists(x['account']): 638 if x['ref'] in self._vault['account'][x['account']]['log']: 639 if dry: 640 continue 641 if sub_positive_log_negative == -x['value']: 642 self._vault['account'][x['account']]['count'] -= 1 643 sub_positive_log_negative = 0 644 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 645 if not box_ref is None: 646 assert self.box_exists(x['account'], box_ref) 647 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 648 assert box_value < 0 649 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 650 self._vault['account'][x['account']]['balance'] += -box_value 651 self._vault['account'][x['account']]['count'] -= 1 652 del self._vault['account'][x['account']]['log'][x['ref']] 653 654 case Action.SUB: 655 if x['account'] is not None: 656 if self.account_exists(x['account']): 657 if x['ref'] in self._vault['account'][x['account']]['box']: 658 if dry: 659 continue 660 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 661 self._vault['account'][x['account']]['balance'] += x['value'] 662 sub_positive_log_negative = x['value'] 663 664 case Action.ADD_FILE: 665 if x['account'] is not None: 666 if self.account_exists(x['account']): 667 if x['ref'] in self._vault['account'][x['account']]['log']: 668 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 669 if dry: 670 continue 671 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 672 673 case Action.REMOVE_FILE: 674 if x['account'] is not None: 675 if self.account_exists(x['account']): 676 if x['ref'] in self._vault['account'][x['account']]['log']: 677 if dry: 678 continue 679 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 680 681 case Action.BOX_TRANSFER: 682 if x['account'] is not None: 683 if self.account_exists(x['account']): 684 if x['ref'] in self._vault['account'][x['account']]['box']: 685 if dry: 686 continue 687 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 688 689 case Action.EXCHANGE: 690 if x['account'] is not None: 691 if x['account'] in self._vault['exchange']: 692 if x['ref'] in self._vault['exchange'][x['account']]: 693 if dry: 694 continue 695 del self._vault['exchange'][x['account']][x['ref']] 696 697 case Action.REPORT: 698 if x['ref'] in self._vault['report']: 699 if dry: 700 continue 701 del self._vault['report'][x['ref']] 702 703 case Action.ZAKAT: 704 if x['account'] is not None: 705 if self.account_exists(x['account']): 706 if x['ref'] in self._vault['account'][x['account']]['box']: 707 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 708 if dry: 709 continue 710 match x['math']: 711 case MathOperation.ADDITION: 712 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 713 'value'] 714 case MathOperation.EQUAL: 715 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 716 case MathOperation.SUBTRACTION: 717 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 718 'value'] 719 720 if not dry: 721 del self._vault['history'][ref] 722 return True 723 724 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 725 """ 726 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 727 728 Parameters: 729 account (str): The account number for which to check the existence of the reference. 730 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 731 ref (int): The reference (transaction) number to check for existence. 732 733 Returns: 734 bool: True if the reference exists for the given account and reference type, False otherwise. 735 """ 736 if account in self._vault['account']: 737 return ref in self._vault['account'][account][ref_type] 738 return False 739 740 def box_exists(self, account: str, ref: int) -> bool: 741 """ 742 Check if a specific box (transaction) exists in the vault for a given account and reference. 743 744 Parameters: 745 - account (str): The account number for which to check the existence of the box. 746 - ref (int): The reference (transaction) number to check for existence. 747 748 Returns: 749 - bool: True if the box exists for the given account and reference, False otherwise. 750 """ 751 return self.ref_exists(account, 'box', ref) 752 753 def track(self, value: float = 0, desc: str = '', account: str = 1, logging: bool = True, created: int = None, 754 debug: bool = False) -> int: 755 """ 756 This function tracks a transaction for a specific account. 757 758 Parameters: 759 value (float): The value of the transaction. Default is 0. 760 desc (str): The description of the transaction. Default is an empty string. 761 account (str): The account for which the transaction is being tracked. Default is '1'. 762 logging (bool): Whether to log the transaction. Default is True. 763 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 764 debug (bool): Whether to print debug information. Default is False. 765 766 Returns: 767 int: The timestamp of the transaction. 768 769 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 770 771 Raises: 772 ValueError: The log transaction happened again in the same nanosecond time. 773 ValueError: The box transaction happened again in the same nanosecond time. 774 """ 775 if debug: 776 print('track', f'debug={debug}') 777 if created is None: 778 created = self.time() 779 no_lock = self.nolock() 780 self.lock() 781 if not self.account_exists(account): 782 if debug: 783 print(f"account {account} created") 784 self._vault['account'][account] = { 785 'balance': 0, 786 'box': {}, 787 'count': 0, 788 'log': {}, 789 'hide': False, 790 'zakatable': True, 791 } 792 self._step(Action.CREATE, account) 793 if value == 0: 794 if no_lock: 795 self.free(self.lock()) 796 return 0 797 if logging: 798 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 799 if debug: 800 print('create-box', created) 801 if self.box_exists(account, created): 802 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 803 if debug: 804 print('created-box', created) 805 self._vault['account'][account]['box'][created] = { 806 'capital': value, 807 'count': 0, 808 'last': 0, 809 'rest': value, 810 'total': 0, 811 } 812 self._step(Action.TRACK, account, ref=created, value=value) 813 if no_lock: 814 self.free(self.lock()) 815 return created 816 817 def log_exists(self, account: str, ref: int) -> bool: 818 """ 819 Checks if a specific transaction log entry exists for a given account. 820 821 Parameters: 822 account (str): The account number associated with the transaction log. 823 ref (int): The reference to the transaction log entry. 824 825 Returns: 826 bool: True if the transaction log entry exists, False otherwise. 827 """ 828 return self.ref_exists(account, 'log', ref) 829 830 def _log(self, value: float, desc: str = '', account: str = 1, created: int = None, ref: int = None, 831 debug: bool = False) -> int: 832 """ 833 Log a transaction into the account's log. 834 835 Parameters: 836 value (float): The value of the transaction. 837 desc (str): The description of the transaction. 838 account (str): The account to log the transaction into. Default is '1'. 839 created (int): The timestamp of the transaction. If not provided, it will be generated. 840 841 Returns: 842 int: The timestamp of the logged transaction. 843 844 This method updates the account's balance, count, and log with the transaction details. 845 It also creates a step in the history of the transaction. 846 847 Raises: 848 ValueError: The log transaction happened again in the same nanosecond time. 849 """ 850 if debug: 851 print('_log', f'debug={debug}') 852 if created is None: 853 created = self.time() 854 try: 855 self._vault['account'][account]['balance'] += value 856 except TypeError: 857 self._vault['account'][account]['balance'] += Decimal(value) 858 self._vault['account'][account]['count'] += 1 859 if debug: 860 print('create-log', created) 861 if self.log_exists(account, created): 862 raise ValueError(f"The log transaction happened again in the same nanosecond time({created}).") 863 if debug: 864 print('created-log', created) 865 self._vault['account'][account]['log'][created] = { 866 'value': value, 867 'desc': desc, 868 'ref': ref, 869 'file': {}, 870 } 871 self._step(Action.LOG, account, ref=created, value=value) 872 return created 873 874 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 875 debug: bool = False) -> dict: 876 """ 877 This method is used to record or retrieve exchange rates for a specific account. 878 879 Parameters: 880 - account (str): The account number for which the exchange rate is being recorded or retrieved. 881 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 882 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 883 - description (str): A description of the exchange rate. 884 885 Returns: 886 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 887 it returns a dictionary with default values for the rate and description. 888 """ 889 if debug: 890 print('exchange', f'debug={debug}') 891 if created is None: 892 created = self.time() 893 no_lock = self.nolock() 894 self.lock() 895 if rate is not None: 896 if rate <= 0: 897 return dict() 898 if account not in self._vault['exchange']: 899 self._vault['exchange'][account] = {} 900 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 901 return {"time": created, "rate": 1, "description": None} 902 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 903 self._step(Action.EXCHANGE, account, ref=created, value=rate) 904 if no_lock: 905 self.free(self.lock()) 906 if debug: 907 print("exchange-created-1", 908 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 909 910 if account in self._vault['exchange']: 911 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 912 if valid_rates: 913 latest_rate = max(valid_rates, key=lambda x: x[0]) 914 if debug: 915 print("exchange-read-1", 916 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 917 'latest_rate', latest_rate) 918 result = latest_rate[1] 919 result['time'] = latest_rate[0] 920 return result # إرجاع قاموس يحتوي على المعدل والوصف 921 if debug: 922 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 923 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ 924 925 @staticmethod 926 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 927 """ 928 This function calculates the exchanged amount of a currency. 929 930 Args: 931 x (float): The original amount of the currency. 932 x_rate (float): The exchange rate of the original currency. 933 y_rate (float): The exchange rate of the target currency. 934 935 Returns: 936 float: The exchanged amount of the target currency. 937 """ 938 return (x * x_rate) / y_rate 939 940 def exchanges(self) -> dict: 941 """ 942 Retrieve the recorded exchange rates for all accounts. 943 944 Parameters: 945 None 946 947 Returns: 948 dict: A dictionary containing all recorded exchange rates. 949 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 950 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 951 """ 952 return self._vault['exchange'].copy() 953 954 def accounts(self) -> dict: 955 """ 956 Returns a dictionary containing account numbers as keys and their respective balances as values. 957 958 Parameters: 959 None 960 961 Returns: 962 dict: A dictionary where keys are account numbers and values are their respective balances. 963 """ 964 result = {} 965 for i in self._vault['account']: 966 result[i] = self._vault['account'][i]['balance'] 967 return result 968 969 def boxes(self, account) -> dict: 970 """ 971 Retrieve the boxes (transactions) associated with a specific account. 972 973 Parameters: 974 account (str): The account number for which to retrieve the boxes. 975 976 Returns: 977 dict: A dictionary containing the boxes associated with the given account. 978 If the account does not exist, an empty dictionary is returned. 979 """ 980 if self.account_exists(account): 981 return self._vault['account'][account]['box'] 982 return {} 983 984 def logs(self, account) -> dict: 985 """ 986 Retrieve the logs (transactions) associated with a specific account. 987 988 Parameters: 989 account (str): The account number for which to retrieve the logs. 990 991 Returns: 992 dict: A dictionary containing the logs associated with the given account. 993 If the account does not exist, an empty dictionary is returned. 994 """ 995 if self.account_exists(account): 996 return self._vault['account'][account]['log'] 997 return {} 998 999 def add_file(self, account: str, ref: int, path: str) -> int: 1000 """ 1001 Adds a file reference to a specific transaction log entry in the vault. 1002 1003 Parameters: 1004 account (str): The account number associated with the transaction log. 1005 ref (int): The reference to the transaction log entry. 1006 path (str): The path of the file to be added. 1007 1008 Returns: 1009 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1010 """ 1011 if self.account_exists(account): 1012 if ref in self._vault['account'][account]['log']: 1013 file_ref = self.time() 1014 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1015 no_lock = self.nolock() 1016 self.lock() 1017 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1018 if no_lock: 1019 self.free(self.lock()) 1020 return file_ref 1021 return 0 1022 1023 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1024 """ 1025 Removes a file reference from a specific transaction log entry in the vault. 1026 1027 Parameters: 1028 account (str): The account number associated with the transaction log. 1029 ref (int): The reference to the transaction log entry. 1030 file_ref (int): The reference of the file to be removed. 1031 1032 Returns: 1033 bool: True if the file reference is successfully removed, False otherwise. 1034 """ 1035 if self.account_exists(account): 1036 if ref in self._vault['account'][account]['log']: 1037 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1038 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1039 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1040 no_lock = self.nolock() 1041 self.lock() 1042 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1043 if no_lock: 1044 self.free(self.lock()) 1045 return True 1046 return False 1047 1048 def balance(self, account: str = 1, cached: bool = True) -> int: 1049 """ 1050 Calculate and return the balance of a specific account. 1051 1052 Parameters: 1053 account (str): The account number. Default is '1'. 1054 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1055 1056 Returns: 1057 int: The balance of the account. 1058 1059 Note: 1060 If cached is True, the function returns the cached balance. 1061 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1062 """ 1063 if cached: 1064 return self._vault['account'][account]['balance'] 1065 x = 0 1066 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1] 1067 1068 def hide(self, account, status: bool = None) -> bool: 1069 """ 1070 Check or set the hide status of a specific account. 1071 1072 Parameters: 1073 account (str): The account number. 1074 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1075 1076 Returns: 1077 bool: The current or updated hide status of the account. 1078 1079 Raises: 1080 None 1081 1082 Example: 1083 >>> tracker = ZakatTracker() 1084 >>> ref = tracker.track(51, 'desc', 'account1') 1085 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1086 False 1087 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1088 True 1089 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1090 True 1091 >>> tracker.hide('account1', False) 1092 False 1093 """ 1094 if self.account_exists(account): 1095 if status is None: 1096 return self._vault['account'][account]['hide'] 1097 self._vault['account'][account]['hide'] = status 1098 return status 1099 return False 1100 1101 def zakatable(self, account, status: bool = None) -> bool: 1102 """ 1103 Check or set the zakatable status of a specific account. 1104 1105 Parameters: 1106 account (str): The account number. 1107 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1108 1109 Returns: 1110 bool: The current or updated zakatable status of the account. 1111 1112 Raises: 1113 None 1114 1115 Example: 1116 >>> tracker = ZakatTracker() 1117 >>> ref = tracker.track(51, 'desc', 'account1') 1118 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1119 True 1120 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1121 True 1122 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1123 True 1124 >>> tracker.zakatable('account1', False) 1125 False 1126 """ 1127 if self.account_exists(account): 1128 if status is None: 1129 return self._vault['account'][account]['zakatable'] 1130 self._vault['account'][account]['zakatable'] = status 1131 return status 1132 return False 1133 1134 def sub(self, x: float, desc: str = '', account: str = 1, created: int = None, debug: bool = False) -> tuple: 1135 """ 1136 Subtracts a specified value from an account's balance. 1137 1138 Parameters: 1139 x (float): The amount to be subtracted. 1140 desc (str): A description for the transaction. Defaults to an empty string. 1141 account (str): The account from which the value will be subtracted. Defaults to '1'. 1142 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1143 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1144 1145 Returns: 1146 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1147 1148 If the amount to subtract is greater than the account's balance, 1149 the remaining amount will be transferred to a new transaction with a negative value. 1150 1151 Raises: 1152 ValueError: The box transaction happened again in the same nanosecond time. 1153 ValueError: The log transaction happened again in the same nanosecond time. 1154 """ 1155 if debug: 1156 print('sub', f'debug={debug}') 1157 if x < 0: 1158 return tuple() 1159 if x == 0: 1160 ref = self.track(x, '', account) 1161 return ref, ref 1162 if created is None: 1163 created = self.time() 1164 no_lock = self.nolock() 1165 self.lock() 1166 self.track(0, '', account) 1167 self._log(value=-x, desc=desc, account=account, created=created, ref=None, debug=debug) 1168 ids = sorted(self._vault['account'][account]['box'].keys()) 1169 limit = len(ids) + 1 1170 target = x 1171 if debug: 1172 print('ids', ids) 1173 ages = [] 1174 for i in range(-1, -limit, -1): 1175 if target == 0: 1176 break 1177 j = ids[i] 1178 if debug: 1179 print('i', i, 'j', j) 1180 rest = self._vault['account'][account]['box'][j]['rest'] 1181 if rest >= target: 1182 self._vault['account'][account]['box'][j]['rest'] -= target 1183 self._step(Action.SUB, account, ref=j, value=target) 1184 ages.append((j, target)) 1185 target = 0 1186 break 1187 elif target > rest > 0: 1188 chunk = rest 1189 target -= chunk 1190 self._step(Action.SUB, account, ref=j, value=chunk) 1191 ages.append((j, chunk)) 1192 self._vault['account'][account]['box'][j]['rest'] = 0 1193 if target > 0: 1194 self.track(-target, desc, account, False, created) 1195 ages.append((created, target)) 1196 if no_lock: 1197 self.free(self.lock()) 1198 return created, ages 1199 1200 def transfer(self, amount: int, from_account: str, to_account: str, desc: str = '', created: int = None, 1201 debug: bool = False) -> list[int]: 1202 """ 1203 Transfers a specified value from one account to another. 1204 1205 Parameters: 1206 amount (int): The amount to be transferred. 1207 from_account (str): The account from which the value will be transferred. 1208 to_account (str): The account to which the value will be transferred. 1209 desc (str, optional): A description for the transaction. Defaults to an empty string. 1210 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1211 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1212 1213 Returns: 1214 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1215 1216 Raises: 1217 ValueError: Transfer to the same account is forbidden. 1218 ValueError: The box transaction happened again in the same nanosecond time. 1219 ValueError: The log transaction happened again in the same nanosecond time. 1220 """ 1221 if debug: 1222 print('transfer', f'debug={debug}') 1223 if from_account == to_account: 1224 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1225 if amount <= 0: 1226 return [] 1227 if created is None: 1228 created = self.time() 1229 (_, ages) = self.sub(amount, desc, from_account, created, debug=debug) 1230 times = [] 1231 source_exchange = self.exchange(from_account, created) 1232 target_exchange = self.exchange(to_account, created) 1233 1234 if debug: 1235 print('ages', ages) 1236 1237 for age, value in ages: 1238 target_amount = self.exchange_calc(value, source_exchange['rate'], target_exchange['rate']) 1239 # Perform the transfer 1240 if self.box_exists(to_account, age): 1241 if debug: 1242 print('box_exists', age) 1243 capital = self._vault['account'][to_account]['box'][age]['capital'] 1244 rest = self._vault['account'][to_account]['box'][age]['rest'] 1245 if debug: 1246 print( 1247 f"Transfer {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1248 selected_age = age 1249 if rest + target_amount > capital: 1250 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1251 selected_age = ZakatTracker.time() 1252 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1253 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1254 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1255 created=None, ref=None, debug=debug) 1256 times.append((age, y)) 1257 continue 1258 y = self.track(target_amount, desc, to_account, logging=True, created=age, debug=debug) 1259 if debug: 1260 print( 1261 f"Transferred {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1262 times.append(y) 1263 return times 1264 1265 def check(self, silver_gram_price: float, nisab: float = None, debug: bool = False, now: int = None, 1266 cycle: float = None) -> tuple: 1267 """ 1268 Check the eligibility for Zakat based on the given parameters. 1269 1270 Parameters: 1271 silver_gram_price (float): The price of a gram of silver. 1272 nisab (float): The minimum amount of wealth required for Zakat. If not provided, 1273 it will be calculated based on the silver_gram_price. 1274 debug (bool): Flag to enable debug mode. 1275 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1276 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1277 1278 Returns: 1279 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1280 and a dictionary containing the Zakat plan. 1281 """ 1282 if debug: 1283 print('check', f'debug={debug}') 1284 if now is None: 1285 now = self.time() 1286 if cycle is None: 1287 cycle = ZakatTracker.TimeCycle() 1288 if nisab is None: 1289 nisab = ZakatTracker.Nisab(silver_gram_price) 1290 plan = {} 1291 below_nisab = 0 1292 brief = [0, 0, 0] 1293 valid = False 1294 if debug: 1295 print('exchanges', self.exchanges()) 1296 for x in self._vault['account']: 1297 if not self.zakatable(x): 1298 continue 1299 _box = self._vault['account'][x]['box'] 1300 _log = self._vault['account'][x]['log'] 1301 limit = len(_box) + 1 1302 ids = sorted(self._vault['account'][x]['box'].keys()) 1303 for i in range(-1, -limit, -1): 1304 j = ids[i] 1305 rest = float(_box[j]['rest']) 1306 if rest <= 0: 1307 continue 1308 exchange = self.exchange(x, created=self.time()) 1309 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1310 brief[0] += rest 1311 index = limit + i - 1 1312 epoch = (now - j) / cycle 1313 if debug: 1314 print(f"Epoch: {epoch}", _box[j]) 1315 if _box[j]['last'] > 0: 1316 epoch = (now - _box[j]['last']) / cycle 1317 if debug: 1318 print(f"Epoch: {epoch}") 1319 epoch = floor(epoch) 1320 if debug: 1321 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1322 if epoch == 0: 1323 continue 1324 if debug: 1325 print("Epoch - PASSED") 1326 brief[1] += rest 1327 if rest >= nisab: 1328 total = 0 1329 for _ in range(epoch): 1330 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1331 if total > 0: 1332 if x not in plan: 1333 plan[x] = {} 1334 valid = True 1335 brief[2] += total 1336 plan[x][index] = { 1337 'total': total, 1338 'count': epoch, 1339 'box_time': j, 1340 'box_capital': _box[j]['capital'], 1341 'box_rest': _box[j]['rest'], 1342 'box_last': _box[j]['last'], 1343 'box_total': _box[j]['total'], 1344 'box_count': _box[j]['count'], 1345 'box_log': _log[j]['desc'], 1346 'exchange_rate': exchange['rate'], 1347 'exchange_time': exchange['time'], 1348 'exchange_desc': exchange['description'], 1349 } 1350 else: 1351 chunk = ZakatTracker.ZakatCut(float(rest)) 1352 if chunk > 0: 1353 if x not in plan: 1354 plan[x] = {} 1355 if j not in plan[x].keys(): 1356 plan[x][index] = {} 1357 below_nisab += rest 1358 brief[2] += chunk 1359 plan[x][index]['below_nisab'] = chunk 1360 plan[x][index]['total'] = chunk 1361 plan[x][index]['count'] = epoch 1362 plan[x][index]['box_time'] = j 1363 plan[x][index]['box_capital'] = _box[j]['capital'] 1364 plan[x][index]['box_rest'] = _box[j]['rest'] 1365 plan[x][index]['box_last'] = _box[j]['last'] 1366 plan[x][index]['box_total'] = _box[j]['total'] 1367 plan[x][index]['box_count'] = _box[j]['count'] 1368 plan[x][index]['box_log'] = _log[j]['desc'] 1369 plan[x][index]['exchange_rate'] = exchange['rate'] 1370 plan[x][index]['exchange_time'] = exchange['time'] 1371 plan[x][index]['exchange_desc'] = exchange['description'] 1372 valid = valid or below_nisab >= nisab 1373 if debug: 1374 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1375 return valid, brief, plan 1376 1377 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1378 """ 1379 Build payment parts for the Zakat distribution. 1380 1381 Parameters: 1382 demand (float): The total demand for payment in local currency. 1383 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1384 1385 Returns: 1386 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1387 { 1388 'account': { 1389 'account_id': {'balance': float, 'rate': float, 'part': float}, 1390 ... 1391 }, 1392 'exceed': bool, 1393 'demand': float, 1394 'total': float, 1395 } 1396 """ 1397 total = 0 1398 parts = { 1399 'account': {}, 1400 'exceed': False, 1401 'demand': demand, 1402 } 1403 for x, y in self.accounts().items(): 1404 if positive_only and y <= 0: 1405 continue 1406 total += float(y) 1407 exchange = self.exchange(x) 1408 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1409 parts['total'] = total 1410 return parts 1411 1412 @staticmethod 1413 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1414 """ 1415 Checks the validity of payment parts. 1416 1417 Parameters: 1418 parts (dict): A dictionary containing payment parts information. 1419 debug (bool): Flag to enable debug mode. 1420 1421 Returns: 1422 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1423 1424 Error Codes: 1425 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1426 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1427 3: 'part' value in parts['account'][x] is less than 0. 1428 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1429 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1430 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1431 """ 1432 if debug: 1433 print('check_payment_parts', f'debug={debug}') 1434 for i in ['demand', 'account', 'total', 'exceed']: 1435 if i not in parts: 1436 return 1 1437 exceed = parts['exceed'] 1438 for x in parts['account']: 1439 for j in ['balance', 'rate', 'part']: 1440 if j not in parts['account'][x]: 1441 return 2 1442 if parts['account'][x]['part'] < 0: 1443 return 3 1444 if not exceed and parts['account'][x]['balance'] <= 0: 1445 return 4 1446 demand = parts['demand'] 1447 z = 0 1448 for _, y in parts['account'].items(): 1449 if not exceed and y['part'] > y['balance']: 1450 return 5 1451 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1452 z = round(z, 2) 1453 demand = round(demand, 2) 1454 if debug: 1455 print('check_payment_parts', f'z = {z}, demand = {demand}') 1456 print('check_payment_parts', type(z), type(demand)) 1457 print('check_payment_parts', z != demand) 1458 print('check_payment_parts', str(z) != str(demand)) 1459 if z != demand and str(z) != str(demand): 1460 return 6 1461 return 0 1462 1463 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1464 """ 1465 Perform Zakat calculation based on the given report and optional parts. 1466 1467 Parameters: 1468 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1469 parts (dict): A dictionary containing the payment parts for the zakat. 1470 debug (bool): A flag indicating whether to print debug information. 1471 1472 Returns: 1473 bool: True if the zakat calculation is successful, False otherwise. 1474 """ 1475 if debug: 1476 print('zakat', f'debug={debug}') 1477 valid, _, plan = report 1478 if not valid: 1479 return valid 1480 parts_exist = parts is not None 1481 if parts_exist: 1482 if self.check_payment_parts(parts, debug=debug) != 0: 1483 return False 1484 if debug: 1485 print('######### zakat #######') 1486 print('parts_exist', parts_exist) 1487 no_lock = self.nolock() 1488 self.lock() 1489 report_time = self.time() 1490 self._vault['report'][report_time] = report 1491 self._step(Action.REPORT, ref=report_time) 1492 created = self.time() 1493 for x in plan: 1494 target_exchange = self.exchange(x) 1495 if debug: 1496 print(plan[x]) 1497 print('-------------') 1498 print(self._vault['account'][x]['box']) 1499 ids = sorted(self._vault['account'][x]['box'].keys()) 1500 if debug: 1501 print('plan[x]', plan[x]) 1502 for i in plan[x].keys(): 1503 j = ids[i] 1504 if debug: 1505 print('i', i, 'j', j) 1506 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1507 key='last', 1508 math_operation=MathOperation.EQUAL) 1509 self._vault['account'][x]['box'][j]['last'] = created 1510 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1511 self._vault['account'][x]['box'][j]['total'] += amount 1512 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1513 math_operation=MathOperation.ADDITION) 1514 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1515 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1516 math_operation=MathOperation.ADDITION) 1517 if not parts_exist: 1518 try: 1519 self._vault['account'][x]['box'][j]['rest'] -= amount 1520 except TypeError: 1521 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1522 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1523 # math_operation=MathOperation.SUBTRACTION) 1524 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1525 if parts_exist: 1526 for account, part in parts['account'].items(): 1527 if part['part'] == 0: 1528 continue 1529 if debug: 1530 print('zakat-part', account, part['rate']) 1531 target_exchange = self.exchange(account) 1532 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1533 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1534 if no_lock: 1535 self.free(self.lock()) 1536 return True 1537 1538 def export_json(self, path: str = "data.json") -> bool: 1539 """ 1540 Exports the current state of the ZakatTracker object to a JSON file. 1541 1542 Parameters: 1543 path (str): The path where the JSON file will be saved. Default is "data.json". 1544 1545 Returns: 1546 bool: True if the export is successful, False otherwise. 1547 1548 Raises: 1549 No specific exceptions are raised by this method. 1550 """ 1551 with open(path, "w") as file: 1552 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1553 return True 1554 1555 def save(self, path: str = None) -> bool: 1556 """ 1557 Saves the ZakatTracker's current state to a pickle file. 1558 1559 This method serializes the internal data (`_vault`) along with metadata 1560 (Python version, pickle protocol) for future compatibility. 1561 1562 Parameters: 1563 path (str, optional): File path for saving. Defaults to a predefined location. 1564 1565 Returns: 1566 bool: True if the save operation is successful, False otherwise. 1567 """ 1568 if path is None: 1569 path = self.path() 1570 with open(path, "wb") as f: 1571 version = f'{version_info.major}.{version_info.minor}.{version_info.micro}' 1572 pickle_protocol = pickle.HIGHEST_PROTOCOL 1573 data = { 1574 'python_version': version, 1575 'pickle_protocol': pickle_protocol, 1576 'data': self._vault, 1577 } 1578 pickle.dump(data, f, protocol=pickle_protocol) 1579 return True 1580 1581 def load(self, path: str = None) -> bool: 1582 """ 1583 Load the current state of the ZakatTracker object from a pickle file. 1584 1585 Parameters: 1586 path (str): The path where the pickle file is located. If not provided, it will use the default path. 1587 1588 Returns: 1589 bool: True if the load operation is successful, False otherwise. 1590 """ 1591 if path is None: 1592 path = self.path() 1593 if os.path.exists(path): 1594 with open(path, "rb") as f: 1595 data = pickle.load(f) 1596 self._vault = data['data'] 1597 return True 1598 return False 1599 1600 def import_csv_cache_path(self): 1601 """ 1602 Generates the cache file path for imported CSV data. 1603 1604 This function constructs the file path where cached data from CSV imports 1605 will be stored. The cache file is a pickle file (.pickle extension) appended 1606 to the base path of the object. 1607 1608 Returns: 1609 str: The full path to the import CSV cache file. 1610 1611 Example: 1612 >>> obj = ZakatTracker('/data/reports') 1613 >>> obj.import_csv_cache_path() 1614 '/data/reports.import_csv.pickle' 1615 """ 1616 path = self.path() 1617 if path.endswith(".pickle"): 1618 path = path[:-7] 1619 return path + '.import_csv.pickle' 1620 1621 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1622 """ 1623 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1624 1625 Parameters: 1626 path (str): The path to the CSV file. Default is 'file.csv'. 1627 debug (bool): A flag indicating whether to print debug information. 1628 1629 Returns: 1630 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1631 and a dictionary of bad transactions. 1632 1633 Notes: 1634 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1635 are appropriate for the currency pairs involved in the conversions. 1636 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1637 to 1.0 or the previous rate for that account. 1638 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1639 transactions of the same account within the whole imported and existing dataset when doing `check` and 1640 `zakat` operations. 1641 1642 Example Usage: 1643 The CSV file should have the following format, rate is optional per transaction: 1644 account, desc, value, date, rate 1645 For example: 1646 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1647 """ 1648 if debug: 1649 print('import_csv', f'debug={debug}') 1650 cache: list[int] = [] 1651 try: 1652 with open(self.import_csv_cache_path(), "rb") as f: 1653 cache = pickle.load(f) 1654 except: 1655 pass 1656 date_formats = [ 1657 "%Y-%m-%d %H:%M:%S", 1658 "%Y-%m-%dT%H:%M:%S", 1659 "%Y-%m-%dT%H%M%S", 1660 "%Y-%m-%d", 1661 ] 1662 created, found, bad = 0, 0, {} 1663 data: dict[int, list] = {} 1664 with open(path, newline='', encoding="utf-8") as f: 1665 i = 0 1666 for row in csv.reader(f, delimiter=','): 1667 i += 1 1668 hashed = hash(tuple(row)) 1669 if hashed in cache: 1670 found += 1 1671 continue 1672 account = row[0] 1673 desc = row[1] 1674 value = float(row[2]) 1675 rate = 1.0 1676 if row[4:5]: # Empty list if index is out of range 1677 rate = float(row[4]) 1678 date: int = 0 1679 for time_format in date_formats: 1680 try: 1681 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1682 break 1683 except: 1684 pass 1685 # TODO: not allowed for negative dates 1686 if date == 0 or value == 0: 1687 bad[i] = row 1688 continue 1689 if date not in data: 1690 data[date] = [] 1691 # TODO: If duplicated time with different accounts with the same amount it is an indicator of a transfer 1692 data[date].append((date, value, desc, account, rate, hashed)) 1693 1694 if debug: 1695 print('import_csv', len(data)) 1696 1697 def process(row, index=0): 1698 nonlocal created 1699 (date, value, desc, account, rate, hashed) = row 1700 date += index 1701 if rate > 1: 1702 self.exchange(account, created=date, rate=rate) 1703 if value > 0: 1704 self.track(value, desc, account, True, date) 1705 elif value < 0: 1706 self.sub(-value, desc, account, date) 1707 created += 1 1708 cache.append(hashed) 1709 1710 for date, rows in sorted(data.items()): 1711 len_rows = len(rows) 1712 if len_rows == 1: 1713 process(rows[0]) 1714 continue 1715 if debug: 1716 print('-- Duplicated time detected', date, 'len', len_rows) 1717 print(rows) 1718 print('---------------------------------') 1719 for index, row in enumerate(rows): 1720 process(row, index) 1721 with open(self.import_csv_cache_path(), "wb") as f: 1722 pickle.dump(cache, f) 1723 return created, found, bad 1724 1725 ######## 1726 # TESTS # 1727 ####### 1728 1729 @staticmethod 1730 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1731 """ 1732 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 1733 1734 This function iterates through progressively larger units of information 1735 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 1736 range that can be expressed with a reasonable number before the unit. 1737 1738 Parameters: 1739 size (float): The size in bytes to convert. 1740 decimal_places (int, optional): The number of decimal places to display 1741 in the result. Defaults to 2. 1742 1743 Returns: 1744 str: A string representation of the size in a human-readable format, 1745 rounded to the specified number of decimal places. For example: 1746 - "1.50 KB" (1536 bytes) 1747 - "23.00 MB" (24117248 bytes) 1748 - "1.23 GB" (1325899906 bytes) 1749 """ 1750 if type(size) not in (float, int): 1751 raise TypeError("size must be a float or integer") 1752 if type(decimal_places) != int: 1753 raise TypeError("decimal_places must be an integer") 1754 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 1755 if size < 1024.0: 1756 break 1757 size /= 1024.0 1758 return f"{size:.{decimal_places}f} {unit}" 1759 1760 @staticmethod 1761 def get_dict_size(obj: dict, seen: set = None) -> float: 1762 """ 1763 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 1764 1765 This function traverses the dictionary structure, accounting for the size of keys, values, 1766 and any nested objects. It handles various data types commonly found in dictionaries 1767 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 1768 of circular references. 1769 1770 Parameters: 1771 obj (dict): The dictionary whose size is to be calculated. 1772 seen (set, optional): A set used internally to track visited objects 1773 and avoid circular references. Defaults to None. 1774 1775 Returns: 1776 float: An approximate size of the dictionary and its contents in bytes. 1777 1778 Note: 1779 - This function is a method of the `ZakatTracker` class and is likely used to 1780 estimate the memory footprint of data structures relevant to Zakat calculations. 1781 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 1782 not account for all memory overhead depending on the Python implementation. 1783 - Circular references are handled to prevent infinite recursion. 1784 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 1785 - String sizes are estimated based on character length and encoding. 1786 """ 1787 size = 0 1788 if seen is None: 1789 seen = set() 1790 1791 obj_id = id(obj) 1792 if obj_id in seen: 1793 return 0 1794 1795 seen.add(obj_id) 1796 size += sys.getsizeof(obj) 1797 1798 if isinstance(obj, dict): 1799 for k, v in obj.items(): 1800 size += ZakatTracker.get_dict_size(k, seen) 1801 size += ZakatTracker.get_dict_size(v, seen) 1802 elif isinstance(obj, (list, tuple, set, frozenset)): 1803 for item in obj: 1804 size += ZakatTracker.get_dict_size(item, seen) 1805 elif isinstance(obj, (int, float, complex)): # Handle numbers 1806 pass # Basic numbers have a fixed size, so nothing to add here 1807 elif isinstance(obj, str): # Handle strings 1808 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 1809 return size 1810 1811 @staticmethod 1812 def duration_from_nanoseconds(ns: int, 1813 show_zeros_in_spoken_time: bool = False, 1814 spoken_time_separator=',', 1815 millennia: str = 'Millennia', 1816 century: str = 'Century', 1817 years: str = 'Years', 1818 days: str = 'Days', 1819 hours: str = 'Hours', 1820 minutes: str = 'Minutes', 1821 seconds: str = 'Seconds', 1822 milli_seconds: str = 'MilliSeconds', 1823 micro_seconds: str = 'MicroSeconds', 1824 nano_seconds: str = 'NanoSeconds', 1825 ) -> tuple: 1826 """ 1827 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 1828 Convert NanoSeconds to Human Readable Time Format. 1829 A NanoSeconds is a unit of time in the International System of Units (SI) equal 1830 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 1831 Its symbol is μs, sometimes simplified to us when Unicode is not available. 1832 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 1833 1834 INPUT : ms (AKA: MilliSeconds) 1835 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 1836 OUTPUT Variables: time_lapsed, spoken_time 1837 1838 Example Input: duration_from_nanoseconds(ns) 1839 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 1840 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 1841 duration_from_nanoseconds(1234567890123456789012) 1842 """ 1843 us, ns = divmod(ns, 1000) 1844 ms, us = divmod(us, 1000) 1845 s, ms = divmod(ms, 1000) 1846 m, s = divmod(s, 60) 1847 h, m = divmod(m, 60) 1848 d, h = divmod(h, 24) 1849 y, d = divmod(d, 365) 1850 c, y = divmod(y, 100) 1851 n, c = divmod(c, 10) 1852 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 1853 spoken_time_part = [] 1854 if n > 0 or show_zeros_in_spoken_time: 1855 spoken_time_part.append(f"{n: 3d} {millennia}") 1856 if c > 0 or show_zeros_in_spoken_time: 1857 spoken_time_part.append(f"{c: 4d} {century}") 1858 if y > 0 or show_zeros_in_spoken_time: 1859 spoken_time_part.append(f"{y: 3d} {years}") 1860 if d > 0 or show_zeros_in_spoken_time: 1861 spoken_time_part.append(f"{d: 4d} {days}") 1862 if h > 0 or show_zeros_in_spoken_time: 1863 spoken_time_part.append(f"{h: 2d} {hours}") 1864 if m > 0 or show_zeros_in_spoken_time: 1865 spoken_time_part.append(f"{m: 2d} {minutes}") 1866 if s > 0 or show_zeros_in_spoken_time: 1867 spoken_time_part.append(f"{s: 2d} {seconds}") 1868 if ms > 0 or show_zeros_in_spoken_time: 1869 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 1870 if us > 0 or show_zeros_in_spoken_time: 1871 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 1872 if ns > 0 or show_zeros_in_spoken_time: 1873 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 1874 return time_lapsed, spoken_time_separator.join(spoken_time_part) 1875 1876 @staticmethod 1877 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 1878 """ 1879 Convert a specific day, month, and year into a timestamp. 1880 1881 Parameters: 1882 day (int): The day of the month. 1883 month (int): The month of the year. Default is 6 (June). 1884 year (int): The year. Default is 2024. 1885 1886 Returns: 1887 int: The timestamp representing the given day, month, and year. 1888 1889 Note: 1890 This method assumes the default month and year if not provided. 1891 """ 1892 return ZakatTracker.time(datetime.datetime(year, month, day)) 1893 1894 @staticmethod 1895 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 1896 """ 1897 Generate a random date between two given dates. 1898 1899 Parameters: 1900 start_date (datetime.datetime): The start date from which to generate a random date. 1901 end_date (datetime.datetime): The end date until which to generate a random date. 1902 1903 Returns: 1904 datetime.datetime: A random date between the start_date and end_date. 1905 """ 1906 time_between_dates = end_date - start_date 1907 days_between_dates = time_between_dates.days 1908 random_number_of_days = random.randrange(days_between_dates) 1909 return start_date + datetime.timedelta(days=random_number_of_days) 1910 1911 @staticmethod 1912 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 1913 debug: bool = False) -> int: 1914 """ 1915 Generate a random CSV file with specified parameters. 1916 1917 Parameters: 1918 path (str): The path where the CSV file will be saved. Default is "data.csv". 1919 count (int): The number of rows to generate in the CSV file. Default is 1000. 1920 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 1921 debug (bool): A flag indicating whether to print debug information. 1922 1923 Returns: 1924 None. The function generates a CSV file at the specified path with the given count of rows. 1925 Each row contains a randomly generated account, description, value, and date. 1926 The value is randomly generated between 1000 and 100000, 1927 and the date is randomly generated between 1950-01-01 and 2023-12-31. 1928 If the row number is not divisible by 13, the value is multiplied by -1. 1929 """ 1930 if debug: 1931 print('generate_random_csv_file', f'debug={debug}') 1932 i = 0 1933 with open(path, "w", newline="") as csvfile: 1934 writer = csv.writer(csvfile) 1935 for i in range(count): 1936 account = f"acc-{random.randint(1, 1000)}" 1937 desc = f"Some text {random.randint(1, 1000)}" 1938 value = random.randint(1000, 100000) 1939 date = ZakatTracker.generate_random_date(datetime.datetime(1950, 1, 1), 1940 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 1941 if not i % 13 == 0: 1942 value *= -1 1943 row = [account, desc, value, date] 1944 if with_rate: 1945 rate = random.randint(1, 100) * 0.12 1946 if debug: 1947 print('before-append', row) 1948 row.append(rate) 1949 if debug: 1950 print('after-append', row) 1951 writer.writerow(row) 1952 i = i + 1 1953 return i 1954 1955 @staticmethod 1956 def create_random_list(max_sum, min_value=0, max_value=10): 1957 """ 1958 Creates a list of random integers whose sum does not exceed the specified maximum. 1959 1960 Args: 1961 max_sum: The maximum allowed sum of the list elements. 1962 min_value: The minimum possible value for an element (inclusive). 1963 max_value: The maximum possible value for an element (inclusive). 1964 1965 Returns: 1966 A list of random integers. 1967 """ 1968 result = [] 1969 current_sum = 0 1970 1971 while current_sum < max_sum: 1972 # Calculate the remaining space for the next element 1973 remaining_sum = max_sum - current_sum 1974 # Determine the maximum possible value for the next element 1975 next_max_value = min(remaining_sum, max_value) 1976 # Generate a random element within the allowed range 1977 next_element = random.randint(min_value, next_max_value) 1978 result.append(next_element) 1979 current_sum += next_element 1980 1981 return result 1982 1983 def _test_core(self, restore=False, debug=False): 1984 1985 if debug: 1986 random.seed(1234567890) 1987 1988 # sanity check - random forward time 1989 1990 xlist = [] 1991 limit = 1000 1992 for _ in range(limit): 1993 y = ZakatTracker.time() 1994 z = '-' 1995 if y not in xlist: 1996 xlist.append(y) 1997 else: 1998 z = 'x' 1999 if debug: 2000 print(z, y) 2001 xx = len(xlist) 2002 if debug: 2003 print('count', xx, ' - unique: ', (xx / limit) * 100, '%') 2004 assert limit == xx 2005 2006 # sanity check - convert date since 1000AD 2007 2008 for year in range(1000, 9000): 2009 ns = ZakatTracker.time(datetime.datetime.strptime(f"{year}-12-30 18:30:45", "%Y-%m-%d %H:%M:%S")) 2010 date = ZakatTracker.time_to_datetime(ns) 2011 if debug: 2012 print(date) 2013 assert date.year == year 2014 assert date.month == 12 2015 assert date.day == 30 2016 assert date.hour == 18 2017 assert date.minute == 30 2018 assert date.second in [44, 45] 2019 2020 # human_readable_size 2021 2022 assert ZakatTracker.human_readable_size(0) == "0.00 B" 2023 assert ZakatTracker.human_readable_size(512) == "512.00 B" 2024 assert ZakatTracker.human_readable_size(1023) == "1023.00 B" 2025 2026 assert ZakatTracker.human_readable_size(1024) == "1.00 KB" 2027 assert ZakatTracker.human_readable_size(2048) == "2.00 KB" 2028 assert ZakatTracker.human_readable_size(5120) == "5.00 KB" 2029 2030 assert ZakatTracker.human_readable_size(1024 ** 2) == "1.00 MB" 2031 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2) == "2.50 MB" 2032 2033 assert ZakatTracker.human_readable_size(1024 ** 3) == "1.00 GB" 2034 assert ZakatTracker.human_readable_size(1024 ** 4) == "1.00 TB" 2035 assert ZakatTracker.human_readable_size(1024 ** 5) == "1.00 PB" 2036 2037 assert ZakatTracker.human_readable_size(1536, decimal_places=0) == "2 KB" 2038 assert ZakatTracker.human_readable_size(2.5 * 1024 ** 2, decimal_places=1) == "2.5 MB" 2039 assert ZakatTracker.human_readable_size(1234567890, decimal_places=3) == "1.150 GB" 2040 2041 try: 2042 ZakatTracker.human_readable_size("not a number") 2043 assert False, "Expected TypeError for invalid input" 2044 except TypeError: 2045 pass 2046 2047 try: 2048 ZakatTracker.human_readable_size(1024, decimal_places="not an int") 2049 assert False, "Expected TypeError for invalid decimal_places" 2050 except TypeError: 2051 pass 2052 2053 # get_dict_size 2054 assert ZakatTracker.get_dict_size({}) == sys.getsizeof({}), "Empty dictionary size mismatch" 2055 assert ZakatTracker.get_dict_size({"a": 1, "b": 2.5, "c": True}) != sys.getsizeof({}), "Not Empty dictionary" 2056 2057 # number scale 2058 error = 0 2059 total = 0 2060 for return_type in ( 2061 float, 2062 Decimal, 2063 ): 2064 for i in range(101): 2065 for j in range(101): 2066 total += 1 2067 num_str = f'{i}.{j}' 2068 num = return_type(num_str) 2069 scaled = self.scale(num) 2070 unscaled = self.unscale(scaled, return_type=return_type) 2071 if debug: 2072 print(f'return_type: {return_type}, num_str: {num_str} - num: {num} - scaled: {scaled} - unscaled: {unscaled}') 2073 if unscaled != num: 2074 if debug: 2075 print('***** SCALE ERROR *****') 2076 error += 1 2077 if debug: 2078 print(f'total: {total}, error({error}): {100 * error / total}%') 2079 assert error == 0 2080 2081 assert self.nolock() 2082 assert self._history() is True 2083 2084 table = { 2085 1: [ 2086 (0, 10, 10, 10, 10, 1, 1), 2087 (0, 20, 30, 30, 30, 2, 2), 2088 (0, 30, 60, 60, 60, 3, 3), 2089 (1, 15, 45, 45, 45, 3, 4), 2090 (1, 50, -5, -5, -5, 4, 5), 2091 (1, 100, -105, -105, -105, 5, 6), 2092 ], 2093 'wallet': [ 2094 (1, 90, -90, -90, -90, 1, 1), 2095 (0, 100, 10, 10, 10, 2, 2), 2096 (1, 190, -180, -180, -180, 3, 3), 2097 (0, 1000, 820, 820, 820, 4, 4), 2098 ], 2099 } 2100 for x in table: 2101 for y in table[x]: 2102 self.lock() 2103 if y[0] == 0: 2104 ref = self.track(y[1], 'test-add', x, True, ZakatTracker.time(), debug) 2105 else: 2106 (ref, z) = self.sub(y[1], 'test-sub', x, ZakatTracker.time()) 2107 if debug: 2108 print('_sub', z, ZakatTracker.time()) 2109 assert ref != 0 2110 assert len(self._vault['account'][x]['log'][ref]['file']) == 0 2111 for i in range(3): 2112 file_ref = self.add_file(x, ref, 'file_' + str(i)) 2113 sleep(0.0000001) 2114 assert file_ref != 0 2115 if debug: 2116 print('ref', ref, 'file', file_ref) 2117 assert len(self._vault['account'][x]['log'][ref]['file']) == i + 1 2118 file_ref = self.add_file(x, ref, 'file_' + str(3)) 2119 assert self.remove_file(x, ref, file_ref) 2120 assert self.balance(x) == y[2] 2121 z = self.balance(x, False) 2122 if debug: 2123 print("debug-1", z, y[3]) 2124 assert z == y[3] 2125 o = self._vault['account'][x]['log'] 2126 z = 0 2127 for i in o: 2128 z += o[i]['value'] 2129 if debug: 2130 print("debug-2", z, type(z)) 2131 print("debug-2", y[4], type(y[4])) 2132 assert z == y[4] 2133 if debug: 2134 print('debug-2 - PASSED') 2135 assert self.box_size(x) == y[5] 2136 assert self.log_size(x) == y[6] 2137 assert not self.nolock() 2138 self.free(self.lock()) 2139 assert self.nolock() 2140 assert self.boxes(x) != {} 2141 assert self.logs(x) != {} 2142 2143 assert not self.hide(x) 2144 assert self.hide(x, False) is False 2145 assert self.hide(x) is False 2146 assert self.hide(x, True) 2147 assert self.hide(x) 2148 2149 assert self.zakatable(x) 2150 assert self.zakatable(x, False) is False 2151 assert self.zakatable(x) is False 2152 assert self.zakatable(x, True) 2153 assert self.zakatable(x) 2154 2155 if restore is True: 2156 count = len(self._vault['history']) 2157 if debug: 2158 print('history-count', count) 2159 assert count == 10 2160 # try mode 2161 for _ in range(count): 2162 assert self.recall(True, debug) 2163 count = len(self._vault['history']) 2164 if debug: 2165 print('history-count', count) 2166 assert count == 10 2167 _accounts = list(table.keys()) 2168 accounts_limit = len(_accounts) + 1 2169 for i in range(-1, -accounts_limit, -1): 2170 account = _accounts[i] 2171 if debug: 2172 print(account, len(table[account])) 2173 transaction_limit = len(table[account]) + 1 2174 for j in range(-1, -transaction_limit, -1): 2175 row = table[account][j] 2176 if debug: 2177 print(row, self.balance(account), self.balance(account, False)) 2178 assert self.balance(account) == self.balance(account, False) 2179 assert self.balance(account) == row[2] 2180 assert self.recall(False, debug) 2181 assert self.recall(False, debug) is False 2182 count = len(self._vault['history']) 2183 if debug: 2184 print('history-count', count) 2185 assert count == 0 2186 self.reset() 2187 2188 def test(self, debug: bool = False) -> bool: 2189 if debug: 2190 print('test', f'debug={debug}') 2191 try: 2192 2193 assert self._history() 2194 2195 # Not allowed for duplicate transactions in the same account and time 2196 2197 created = ZakatTracker.time() 2198 self.track(100, 'test-1', 'same', True, created) 2199 failed = False 2200 try: 2201 self.track(50, 'test-1', 'same', True, created) 2202 except: 2203 failed = True 2204 assert failed is True 2205 2206 self.reset() 2207 2208 # Same account transfer 2209 for x in [1, 'a', True, 1.8, None]: 2210 failed = False 2211 try: 2212 self.transfer(1, x, x, 'same-account', debug=debug) 2213 except: 2214 failed = True 2215 assert failed is True 2216 2217 # Always preserve box age during transfer 2218 2219 series: list[tuple] = [ 2220 (30, 4), 2221 (60, 3), 2222 (90, 2), 2223 ] 2224 case = { 2225 30: { 2226 'series': series, 2227 'rest': 150, 2228 }, 2229 60: { 2230 'series': series, 2231 'rest': 120, 2232 }, 2233 90: { 2234 'series': series, 2235 'rest': 90, 2236 }, 2237 180: { 2238 'series': series, 2239 'rest': 0, 2240 }, 2241 270: { 2242 'series': series, 2243 'rest': -90, 2244 }, 2245 360: { 2246 'series': series, 2247 'rest': -180, 2248 }, 2249 } 2250 2251 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2252 2253 for total in case: 2254 for x in case[total]['series']: 2255 self.track(x[0], f"test-{x} ages", 'ages', True, selected_time * x[1]) 2256 2257 refs = self.transfer(total, 'ages', 'future', 'Zakat Movement', debug=debug) 2258 2259 if debug: 2260 print('refs', refs) 2261 2262 ages_cache_balance = self.balance('ages') 2263 ages_fresh_balance = self.balance('ages', False) 2264 rest = case[total]['rest'] 2265 if debug: 2266 print('source', ages_cache_balance, ages_fresh_balance, rest) 2267 assert ages_cache_balance == rest 2268 assert ages_fresh_balance == rest 2269 2270 future_cache_balance = self.balance('future') 2271 future_fresh_balance = self.balance('future', False) 2272 if debug: 2273 print('target', future_cache_balance, future_fresh_balance, total) 2274 print('refs', refs) 2275 assert future_cache_balance == total 2276 assert future_fresh_balance == total 2277 2278 for ref in self._vault['account']['ages']['box']: 2279 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2280 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2281 future_capital = 0 2282 if ref in self._vault['account']['future']['box']: 2283 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2284 future_rest = 0 2285 if ref in self._vault['account']['future']['box']: 2286 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2287 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2288 if debug: 2289 print('================================================================') 2290 print('ages', ages_capital, ages_rest) 2291 print('future', future_capital, future_rest) 2292 if ages_rest == 0: 2293 assert ages_capital == future_capital 2294 elif ages_rest < 0: 2295 assert -ages_capital == future_capital 2296 elif ages_rest > 0: 2297 assert ages_capital == ages_rest + future_capital 2298 self.reset() 2299 assert len(self._vault['history']) == 0 2300 2301 assert self._history() 2302 assert self._history(False) is False 2303 assert self._history() is False 2304 assert self._history(True) 2305 assert self._history() 2306 2307 self._test_core(True, debug) 2308 self._test_core(False, debug) 2309 2310 transaction = [ 2311 ( 2312 20, 'wallet', 1, 800, 800, 800, 4, 5, 2313 -85, -85, -85, 6, 7, 2314 ), 2315 ( 2316 750, 'wallet', 'safe', 50, 50, 50, 4, 6, 2317 750, 750, 750, 1, 1, 2318 ), 2319 ( 2320 600, 'safe', 'bank', 150, 150, 150, 1, 2, 2321 600, 600, 600, 1, 1, 2322 ), 2323 ] 2324 for z in transaction: 2325 self.lock() 2326 x = z[1] 2327 y = z[2] 2328 self.transfer(z[0], x, y, 'test-transfer', debug=debug) 2329 assert self.balance(x) == z[3] 2330 xx = self.accounts()[x] 2331 assert xx == z[3] 2332 assert self.balance(x, False) == z[4] 2333 assert xx == z[4] 2334 2335 s = 0 2336 log = self._vault['account'][x]['log'] 2337 for i in log: 2338 s += log[i]['value'] 2339 if debug: 2340 print('s', s, 'z[5]', z[5]) 2341 assert s == z[5] 2342 2343 assert self.box_size(x) == z[6] 2344 assert self.log_size(x) == z[7] 2345 2346 yy = self.accounts()[y] 2347 assert self.balance(y) == z[8] 2348 assert yy == z[8] 2349 assert self.balance(y, False) == z[9] 2350 assert yy == z[9] 2351 2352 s = 0 2353 log = self._vault['account'][y]['log'] 2354 for i in log: 2355 s += log[i]['value'] 2356 assert s == z[10] 2357 2358 assert self.box_size(y) == z[11] 2359 assert self.log_size(y) == z[12] 2360 2361 if debug: 2362 pp().pprint(self.check(2.17)) 2363 2364 assert not self.nolock() 2365 history_count = len(self._vault['history']) 2366 if debug: 2367 print('history-count', history_count) 2368 assert history_count == 11 2369 assert not self.free(ZakatTracker.time()) 2370 assert self.free(self.lock()) 2371 assert self.nolock() 2372 assert len(self._vault['history']) == 11 2373 2374 # storage 2375 2376 _path = self.path('test.pickle') 2377 if os.path.exists(_path): 2378 os.remove(_path) 2379 self.save() 2380 assert os.path.getsize(_path) > 0 2381 self.reset() 2382 assert self.recall(False, debug) is False 2383 self.load() 2384 assert self._vault['account'] is not None 2385 2386 # recall 2387 2388 assert self.nolock() 2389 assert len(self._vault['history']) == 11 2390 assert self.recall(False, debug) is True 2391 assert len(self._vault['history']) == 10 2392 assert self.recall(False, debug) is True 2393 assert len(self._vault['history']) == 9 2394 2395 # exchange 2396 2397 self.exchange("cash", 25, 3.75, "2024-06-25") 2398 self.exchange("cash", 22, 3.73, "2024-06-22") 2399 self.exchange("cash", 15, 3.69, "2024-06-15") 2400 self.exchange("cash", 10, 3.66) 2401 2402 for i in range(1, 30): 2403 exchange = self.exchange("cash", i) 2404 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2405 if debug: 2406 print(i, rate, description, created) 2407 assert created 2408 if i < 10: 2409 assert rate == 1 2410 assert description is None 2411 elif i == 10: 2412 assert rate == 3.66 2413 assert description is None 2414 elif i < 15: 2415 assert rate == 3.66 2416 assert description is None 2417 elif i == 15: 2418 assert rate == 3.69 2419 assert description is not None 2420 elif i < 22: 2421 assert rate == 3.69 2422 assert description is not None 2423 elif i == 22: 2424 assert rate == 3.73 2425 assert description is not None 2426 elif i >= 25: 2427 assert rate == 3.75 2428 assert description is not None 2429 exchange = self.exchange("bank", i) 2430 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2431 if debug: 2432 print(i, rate, description, created) 2433 assert created 2434 assert rate == 1 2435 assert description is None 2436 2437 assert len(self._vault['exchange']) > 0 2438 assert len(self.exchanges()) > 0 2439 self._vault['exchange'].clear() 2440 assert len(self._vault['exchange']) == 0 2441 assert len(self.exchanges()) == 0 2442 2443 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2444 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2445 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2446 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2447 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2448 2449 for i in [x * 0.12 for x in range(-15, 21)]: 2450 if i <= 0: 2451 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2452 else: 2453 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2454 2455 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2456 for i in range(1, 31): 2457 timestamp_ns = ZakatTracker.day_to_time(i) 2458 exchange = self.exchange("cash", timestamp_ns) 2459 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2460 if debug: 2461 print(i, rate, description, created) 2462 assert created 2463 if i < 10: 2464 assert rate == 1 2465 assert description is None 2466 elif i == 10: 2467 assert rate == 3.66 2468 assert description is None 2469 elif i < 15: 2470 assert rate == 3.66 2471 assert description is None 2472 elif i == 15: 2473 assert rate == 3.69 2474 assert description is not None 2475 elif i < 22: 2476 assert rate == 3.69 2477 assert description is not None 2478 elif i == 22: 2479 assert rate == 3.73 2480 assert description is not None 2481 elif i >= 25: 2482 assert rate == 3.75 2483 assert description is not None 2484 exchange = self.exchange("bank", i) 2485 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2486 if debug: 2487 print(i, rate, description, created) 2488 assert created 2489 assert rate == 1 2490 assert description is None 2491 2492 # csv 2493 2494 csv_count = 1000 2495 2496 for with_rate, path in { 2497 False: 'test-import_csv-no-exchange', 2498 True: 'test-import_csv-with-exchange', 2499 }.items(): 2500 2501 if debug: 2502 print('test_import_csv', with_rate, path) 2503 2504 # csv 2505 2506 csv_path = path + '.csv' 2507 if os.path.exists(csv_path): 2508 os.remove(csv_path) 2509 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2510 if debug: 2511 print('generate_random_csv_file', c) 2512 assert c == csv_count 2513 assert os.path.getsize(csv_path) > 0 2514 cache_path = self.import_csv_cache_path() 2515 if os.path.exists(cache_path): 2516 os.remove(cache_path) 2517 self.reset() 2518 (created, found, bad) = self.import_csv(csv_path, debug) 2519 bad_count = len(bad) 2520 if debug: 2521 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2522 tmp_size = os.path.getsize(cache_path) 2523 assert tmp_size > 0 2524 assert created + found + bad_count == csv_count 2525 assert created == csv_count 2526 assert bad_count == 0 2527 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2528 bad_2_count = len(bad_2) 2529 if debug: 2530 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2531 print(bad) 2532 assert tmp_size == os.path.getsize(cache_path) 2533 assert created_2 + found_2 + bad_2_count == csv_count 2534 assert created == found_2 2535 assert bad_count == bad_2_count 2536 assert found_2 == csv_count 2537 assert bad_2_count == 0 2538 assert created_2 == 0 2539 2540 # payment parts 2541 2542 positive_parts = self.build_payment_parts(100, positive_only=True) 2543 assert self.check_payment_parts(positive_parts) != 0 2544 assert self.check_payment_parts(positive_parts) != 0 2545 all_parts = self.build_payment_parts(300, positive_only=False) 2546 assert self.check_payment_parts(all_parts) != 0 2547 assert self.check_payment_parts(all_parts) != 0 2548 if debug: 2549 pp().pprint(positive_parts) 2550 pp().pprint(all_parts) 2551 # dynamic discount 2552 suite = [] 2553 count = 3 2554 for exceed in [False, True]: 2555 case = [] 2556 for parts in [positive_parts, all_parts]: 2557 part = parts.copy() 2558 demand = part['demand'] 2559 if debug: 2560 print(demand, part['total']) 2561 i = 0 2562 z = demand / count 2563 cp = { 2564 'account': {}, 2565 'demand': demand, 2566 'exceed': exceed, 2567 'total': part['total'], 2568 } 2569 j = '' 2570 for x, y in part['account'].items(): 2571 x_exchange = self.exchange(x) 2572 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2573 if exceed and zz <= demand: 2574 i += 1 2575 y['part'] = zz 2576 if debug: 2577 print(exceed, y) 2578 cp['account'][x] = y 2579 case.append(y) 2580 elif not exceed and y['balance'] >= zz: 2581 i += 1 2582 y['part'] = zz 2583 if debug: 2584 print(exceed, y) 2585 cp['account'][x] = y 2586 case.append(y) 2587 j = x 2588 if i >= count: 2589 break 2590 if len(cp['account'][j]) > 0: 2591 suite.append(cp) 2592 if debug: 2593 print('suite', len(suite)) 2594 # vault = self._vault.copy() 2595 for case in suite: 2596 # self._vault = vault.copy() 2597 if debug: 2598 print('case', case) 2599 result = self.check_payment_parts(case) 2600 if debug: 2601 print('check_payment_parts', result, f'exceed: {exceed}') 2602 assert result == 0 2603 2604 report = self.check(2.17, None, debug) 2605 (valid, brief, plan) = report 2606 if debug: 2607 print('valid', valid) 2608 zakat_result = self.zakat(report, parts=case, debug=debug) 2609 if debug: 2610 print('zakat-result', zakat_result) 2611 assert valid == zakat_result 2612 2613 assert self.save(path + '.pickle') 2614 assert self.export_json(path + '.json') 2615 2616 assert self.export_json("1000-transactions-test.json") 2617 assert self.save("1000-transactions-test.pickle") 2618 2619 self.reset() 2620 2621 # test transfer between accounts with different exchange rate 2622 2623 a_SAR = "Bank (SAR)" 2624 b_USD = "Bank (USD)" 2625 c_SAR = "Safe (SAR)" 2626 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2627 for case in [ 2628 (0, a_SAR, "SAR Gift", 1000, 1000), 2629 (1, a_SAR, 1), 2630 (0, b_USD, "USD Gift", 500, 500), 2631 (1, b_USD, 1), 2632 (2, b_USD, 3.75), 2633 (1, b_USD, 3.75), 2634 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 400, 1375), 2635 (0, c_SAR, "Salary", 750, 750), 2636 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 375, 500), 2637 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 1371.25, 501), 2638 ]: 2639 match (case[0]): 2640 case 0: # track 2641 _, account, desc, x, balance = case 2642 self.track(value=x, desc=desc, account=account, debug=debug) 2643 2644 cached_value = self.balance(account, cached=True) 2645 fresh_value = self.balance(account, cached=False) 2646 if debug: 2647 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2648 assert cached_value == balance 2649 assert fresh_value == balance 2650 case 1: # check-exchange 2651 _, account, expected_rate = case 2652 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2653 if debug: 2654 print('t-exchange', t_exchange) 2655 assert t_exchange['rate'] == expected_rate 2656 case 2: # do-exchange 2657 _, account, rate = case 2658 self.exchange(account, rate=rate, debug=debug) 2659 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2660 if debug: 2661 print('b-exchange', b_exchange) 2662 assert b_exchange['rate'] == rate 2663 case 3: # transfer 2664 _, x, a, b, desc, a_balance, b_balance = case 2665 self.transfer(x, a, b, desc, debug=debug) 2666 2667 cached_value = self.balance(a, cached=True) 2668 fresh_value = self.balance(a, cached=False) 2669 if debug: 2670 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value) 2671 assert cached_value == a_balance 2672 assert fresh_value == a_balance 2673 2674 cached_value = self.balance(b, cached=True) 2675 fresh_value = self.balance(b, cached=False) 2676 if debug: 2677 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 2678 assert cached_value == b_balance 2679 assert fresh_value == b_balance 2680 2681 # Transfer all in many chunks randomly from B to A 2682 a_SAR_balance = 1371.25 2683 b_USD_balance = 501 2684 b_USD_exchange = self.exchange(b_USD) 2685 amounts = ZakatTracker.create_random_list(b_USD_balance) 2686 if debug: 2687 print('amounts', amounts) 2688 i = 0 2689 for x in amounts: 2690 if debug: 2691 print(f'{i} - transfer-with-exchange({x})') 2692 self.transfer(x, b_USD, a_SAR, f"{x} USD -> SAR", debug=debug) 2693 2694 b_USD_balance -= x 2695 cached_value = self.balance(b_USD, cached=True) 2696 fresh_value = self.balance(b_USD, cached=False) 2697 if debug: 2698 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 2699 b_USD_balance) 2700 assert cached_value == b_USD_balance 2701 assert fresh_value == b_USD_balance 2702 2703 a_SAR_balance += x * b_USD_exchange['rate'] 2704 cached_value = self.balance(a_SAR, cached=True) 2705 fresh_value = self.balance(a_SAR, cached=False) 2706 if debug: 2707 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 2708 a_SAR_balance, 'rate', b_USD_exchange['rate']) 2709 assert cached_value == a_SAR_balance 2710 assert fresh_value == a_SAR_balance 2711 i += 1 2712 2713 # Transfer all in many chunks randomly from C to A 2714 c_SAR_balance = 375 2715 amounts = ZakatTracker.create_random_list(c_SAR_balance) 2716 if debug: 2717 print('amounts', amounts) 2718 i = 0 2719 for x in amounts: 2720 if debug: 2721 print(f'{i} - transfer-with-exchange({x})') 2722 self.transfer(x, c_SAR, a_SAR, f"{x} SAR -> a_SAR", debug=debug) 2723 2724 c_SAR_balance -= x 2725 cached_value = self.balance(c_SAR, cached=True) 2726 fresh_value = self.balance(c_SAR, cached=False) 2727 if debug: 2728 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 2729 c_SAR_balance) 2730 assert cached_value == c_SAR_balance 2731 assert fresh_value == c_SAR_balance 2732 2733 a_SAR_balance += x 2734 cached_value = self.balance(a_SAR, cached=True) 2735 fresh_value = self.balance(a_SAR, cached=False) 2736 if debug: 2737 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 2738 a_SAR_balance) 2739 assert cached_value == a_SAR_balance 2740 assert fresh_value == a_SAR_balance 2741 i += 1 2742 2743 assert self.export_json("accounts-transfer-with-exchange-rates.json") 2744 assert self.save("accounts-transfer-with-exchange-rates.pickle") 2745 2746 # check & zakat with exchange rates for many cycles 2747 2748 for rate, values in { 2749 1: { 2750 'in': [1000, 2000, 10000], 2751 'exchanged': [1000, 2000, 10000], 2752 'out': [25, 50, 731.40625], 2753 }, 2754 3.75: { 2755 'in': [200, 1000, 5000], 2756 'exchanged': [750, 3750, 18750], 2757 'out': [18.75, 93.75, 1371.38671875], 2758 }, 2759 }.items(): 2760 a, b, c = values['in'] 2761 m, n, o = values['exchanged'] 2762 x, y, z = values['out'] 2763 if debug: 2764 print('rate', rate, 'values', values) 2765 for case in [ 2766 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 2767 {'safe': {0: {'below_nisab': x}}}, 2768 ], False, m), 2769 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 2770 {'safe': {0: {'count': 1, 'total': y}}}, 2771 ], True, n), 2772 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 2773 {'cave': {0: {'count': 3, 'total': z}}}, 2774 ], True, o), 2775 ]: 2776 if debug: 2777 print(f"############# check(rate: {rate}) #############") 2778 self.reset() 2779 self.exchange(account=case[1], created=case[2], rate=rate) 2780 self.track(value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 2781 2782 # assert self.nolock() 2783 # history_size = len(self._vault['history']) 2784 # print('history_size', history_size) 2785 # assert history_size == 2 2786 assert self.lock() 2787 assert not self.nolock() 2788 report = self.check(2.17, None, debug) 2789 (valid, brief, plan) = report 2790 assert valid == case[4] 2791 if debug: 2792 print('brief', brief) 2793 assert case[5] == brief[0] 2794 assert case[5] == brief[1] 2795 2796 if debug: 2797 pp().pprint(plan) 2798 2799 for x in plan: 2800 assert case[1] == x 2801 if 'total' in case[3][0][x][0].keys(): 2802 assert case[3][0][x][0]['total'] == brief[2] 2803 assert plan[x][0]['total'] == case[3][0][x][0]['total'] 2804 assert plan[x][0]['count'] == case[3][0][x][0]['count'] 2805 else: 2806 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 2807 if debug: 2808 pp().pprint(report) 2809 result = self.zakat(report, debug=debug) 2810 if debug: 2811 print('zakat-result', result, case[4]) 2812 assert result == case[4] 2813 report = self.check(2.17, None, debug) 2814 (valid, brief, plan) = report 2815 assert valid is False 2816 2817 history_size = len(self._vault['history']) 2818 if debug: 2819 print('history_size', history_size) 2820 assert history_size == 3 2821 assert not self.nolock() 2822 assert self.recall(False, debug) is False 2823 self.free(self.lock()) 2824 assert self.nolock() 2825 2826 for i in range(3, 0, -1): 2827 history_size = len(self._vault['history']) 2828 if debug: 2829 print('history_size', history_size) 2830 assert history_size == i 2831 assert self.recall(False, debug) is True 2832 2833 assert self.nolock() 2834 assert self.recall(False, debug) is False 2835 2836 history_size = len(self._vault['history']) 2837 if debug: 2838 print('history_size', history_size) 2839 assert history_size == 0 2840 2841 account_size = len(self._vault['account']) 2842 if debug: 2843 print('account_size', account_size) 2844 assert account_size == 0 2845 2846 report_size = len(self._vault['report']) 2847 if debug: 2848 print('report_size', report_size) 2849 assert report_size == 0 2850 2851 assert self.nolock() 2852 return True 2853 except: 2854 # pp().pprint(self._vault) 2855 assert self.export_json("test-snapshot.json") 2856 assert self.save("test-snapshot.pickle") 2857 raise
A class for tracking and calculating Zakat.
This class provides functionalities for recording transactions, calculating Zakat due, and managing account balances. It also offers features like importing transactions from CSV files, exporting data to JSON format, and saving/loading the tracker state.
The ZakatTracker
class is designed to handle both positive and negative transactions,
allowing for flexible tracking of financial activities related to Zakat. It also supports
the concept of a "Nisab" (minimum threshold for Zakat) and a "haul" (complete one year for Transaction) can calculate Zakat due
based on the current silver price.
The class uses a pickle file as its database to persist the tracker state, ensuring data integrity across sessions. It also provides options for enabling or disabling history tracking, allowing users to choose their preferred level of detail.
In addition, the ZakatTracker
class includes various helper methods like
time
, time_to_datetime
, lock
, free
, recall
, export_json
,
and more. These methods provide additional functionalities and flexibility
for interacting with and managing the Zakat tracker.
Attributes: ZakatTracker.ZakatCut (function): A function to calculate the Zakat percentage. ZakatTracker.TimeCycle (function): A function to determine the time cycle for Zakat. ZakatTracker.Nisab (function): A function to calculate the Nisab based on the silver price. ZakatTracker.Version (function): The version of the ZakatTracker class.
Data Structure: The ZakatTracker class utilizes a nested dictionary structure called "_vault" to store and manage data.
_vault (dict):
- account (dict):
- {account_number} (dict):
- balance (int): The current balance of the account.
- box (dict): A dictionary storing transaction details.
- {timestamp} (dict):
- capital (int): The initial amount of the transaction.
- count (int): The number of times Zakat has been calculated for this transaction.
- last (int): The timestamp of the last Zakat calculation.
- rest (int): The remaining amount after Zakat deductions and withdrawal.
- total (int): The total Zakat deducted from this transaction.
- count (int): The total number of transactions for the account.
- log (dict): A dictionary storing transaction logs.
- {timestamp} (dict):
- value (int): The transaction amount (positive or negative).
- desc (str): The description of the transaction.
- ref (int): The box reference (positive or None).
- file (dict): A dictionary storing file references associated with the transaction.
- hide (bool): Indicates whether the account is hidden or not.
- zakatable (bool): Indicates whether the account is subject to Zakat.
- exchange (dict):
- account (dict):
- {timestamps} (dict):
- rate (float): Exchange rate when compared to local currency.
- description (str): The description of the exchange rate.
- history (dict):
- {timestamp} (list): A list of dictionaries storing the history of actions performed.
- {action_dict} (dict):
- action (Action): The type of action (CREATE, TRACK, LOG, SUB, ADD_FILE, REMOVE_FILE, BOX_TRANSFER, EXCHANGE, REPORT, ZAKAT).
- account (str): The account number associated with the action.
- ref (int): The reference number of the transaction.
- file (int): The reference number of the file (if applicable).
- key (str): The key associated with the action (e.g., 'rest', 'total').
- value (int): The value associated with the action.
- math (MathOperation): The mathematical operation performed (if applicable).
- lock (int or None): The timestamp indicating the current lock status (None if not locked).
- report (dict):
- {timestamp} (tuple): A tuple storing Zakat report details.
239 def __init__(self, db_path: str = "zakat.pickle", history_mode: bool = True): 240 """ 241 Initialize ZakatTracker with database path and history mode. 242 243 Parameters: 244 db_path (str): The path to the database file. Default is "zakat.pickle". 245 history_mode (bool): The mode for tracking history. Default is True. 246 247 Returns: 248 None 249 """ 250 self._vault_path = None 251 self._vault = None 252 self.reset() 253 self._history(history_mode) 254 self.path(db_path) 255 self.load()
Initialize ZakatTracker with database path and history mode.
Parameters: db_path (str): The path to the database file. Default is "zakat.pickle". history_mode (bool): The mode for tracking history. Default is True.
Returns: None
173 @staticmethod 174 def Version(): 175 """ 176 Returns the current version of the software. 177 178 This function returns a string representing the current version of the software, 179 including major, minor, and patch version numbers in the format "X.Y.Z". 180 181 Returns: 182 str: The current version of the software. 183 """ 184 return '0.2.76'
Returns the current version of the software.
This function returns a string representing the current version of the software, including major, minor, and patch version numbers in the format "X.Y.Z".
Returns: str: The current version of the software.
186 @staticmethod 187 def ZakatCut(x: float) -> float: 188 """ 189 Calculates the Zakat amount due on an asset. 190 191 This function calculates the zakat amount due on a given asset value over one lunar year. 192 Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth 193 that exceeds a certain threshold (Nisab). 194 195 Parameters: 196 x: The total value of the asset on which Zakat is to be calculated. 197 198 Returns: 199 The amount of Zakat due on the asset, calculated as 2.5% of the asset's value. 200 """ 201 return 0.025 * x # Zakat Cut in one Lunar Year
Calculates the Zakat amount due on an asset.
This function calculates the zakat amount due on a given asset value over one lunar year. Zakat is an Islamic obligatory alms-giving, calculated as a fixed percentage of an individual's wealth that exceeds a certain threshold (Nisab).
Parameters: x: The total value of the asset on which Zakat is to be calculated.
Returns: The amount of Zakat due on the asset, calculated as 2.5% of the asset's value.
203 @staticmethod 204 def TimeCycle(days: int = 355) -> int: 205 """ 206 Calculates the approximate duration of a lunar year in nanoseconds. 207 208 This function calculates the approximate duration of a lunar year based on the given number of days. 209 It converts the given number of days into nanoseconds for use in high-precision timing applications. 210 211 Parameters: 212 days: The number of days in a lunar year. Defaults to 355, 213 which is an approximation of the average length of a lunar year. 214 215 Returns: 216 The approximate duration of a lunar year in nanoseconds. 217 """ 218 return int(60 * 60 * 24 * days * 1e9) # Lunar Year in nanoseconds
Calculates the approximate duration of a lunar year in nanoseconds.
This function calculates the approximate duration of a lunar year based on the given number of days. It converts the given number of days into nanoseconds for use in high-precision timing applications.
Parameters: days: The number of days in a lunar year. Defaults to 355, which is an approximation of the average length of a lunar year.
Returns: The approximate duration of a lunar year in nanoseconds.
220 @staticmethod 221 def Nisab(gram_price: float, gram_quantity: float = 595) -> float: 222 """ 223 Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram. 224 225 This function calculates the Nisab value, which is the minimum threshold of wealth, 226 that makes an individual liable for paying Zakat. 227 The Nisab value is determined by the equivalent value of a specific amount 228 of gold or silver (currently 595 grams in silver) in the local currency. 229 230 Parameters: 231 - gram_price (float): The price per gram of Nisab. 232 - gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver. 233 234 Returns: 235 - float: The total value of Nisab based on the given price per gram. 236 """ 237 return gram_price * gram_quantity
Calculate the total value of Nisab (a unit of weight in Islamic jurisprudence) based on the given price per gram.
This function calculates the Nisab value, which is the minimum threshold of wealth, that makes an individual liable for paying Zakat. The Nisab value is determined by the equivalent value of a specific amount of gold or silver (currently 595 grams in silver) in the local currency.
Parameters:
- gram_price (float): The price per gram of Nisab.
- gram_quantity (float): The quantity of grams in a Nisab. Default is 595 grams of silver.
Returns:
- float: The total value of Nisab based on the given price per gram.
257 def path(self, path: str = None) -> str: 258 """ 259 Set or get the database path. 260 261 Parameters: 262 path (str): The path to the database file. If not provided, it returns the current path. 263 264 Returns: 265 str: The current database path. 266 """ 267 if path is not None: 268 self._vault_path = path 269 return self._vault_path
Set or get the database path.
Parameters: path (str): The path to the database file. If not provided, it returns the current path.
Returns: str: The current database path.
271 @staticmethod 272 def scale(x: float | int | Decimal, decimal_places: int = 2) -> int: 273 """ 274 Scales a numerical value by a specified power of 10, returning an integer. 275 276 This function is designed to handle various numeric types (`float`, `int`, or `Decimal`) and 277 facilitate precise scaling operations, particularly useful in financial or scientific calculations. 278 279 Parameters: 280 x: The numeric value to scale. Can be a floating-point number, integer, or decimal. 281 decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled 282 by a factor of 100 (e.g., converts 1.23 to 123). 283 284 Returns: 285 The scaled value, rounded to the nearest integer. 286 287 Raises: 288 TypeError: If the input `x` is not a valid numeric type. 289 290 Examples: 291 >>> scale(3.14159) 292 314 293 >>> scale(1234, decimal_places=3) 294 1234000 295 >>> scale(Decimal("0.005"), decimal_places=4) 296 50 297 """ 298 if not isinstance(x, (float, int, Decimal)): 299 raise TypeError("Input 'x' must be a float, int, or Decimal.") 300 return int(Decimal(f"{x:.{decimal_places}f}") * (10 ** decimal_places))
Scales a numerical value by a specified power of 10, returning an integer.
This function is designed to handle various numeric types (float
, int
, or Decimal
) and
facilitate precise scaling operations, particularly useful in financial or scientific calculations.
Parameters: x: The numeric value to scale. Can be a floating-point number, integer, or decimal. decimal_places: The exponent for the scaling factor (10**y). Defaults to 2, meaning the input is scaled by a factor of 100 (e.g., converts 1.23 to 123).
Returns: The scaled value, rounded to the nearest integer.
Raises:
TypeError: If the input x
is not a valid numeric type.
Examples:
>>> scale(3.14159)
314
>>> scale(1234, decimal_places=3)
1234000
>>> scale(Decimal("0.005"), decimal_places=4)
50
302 @staticmethod 303 def unscale(x: int, return_type: type = float, decimal_places: int = 2) -> float | Decimal: 304 """ 305 Unscales an integer by a power of 10. 306 307 Parameters: 308 x: The integer to unscale. 309 return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. 310 decimal_places: The power of 10 to use. Defaults to 2. 311 312 Returns: 313 The unscaled number, converted to the specified return_type. 314 315 Raises: 316 TypeError: If the return_type is not float or Decimal. 317 """ 318 if return_type not in (float, Decimal): 319 raise TypeError(f'Invalid return_type({return_type}). Supported types are float, int, and Decimal.') 320 return round(return_type(x / (10 ** decimal_places)), decimal_places)
Unscales an integer by a power of 10.
Parameters: x: The integer to unscale. return_type: The desired type for the returned value. Can be float, int, or Decimal. Defaults to float. decimal_places: The power of 10 to use. Defaults to 2.
Returns: The unscaled number, converted to the specified return_type.
Raises: TypeError: If the return_type is not float or Decimal.
336 def reset(self) -> None: 337 """ 338 Reset the internal data structure to its initial state. 339 340 Parameters: 341 None 342 343 Returns: 344 None 345 """ 346 self._vault = { 347 'account': {}, 348 'exchange': {}, 349 'history': {}, 350 'lock': None, 351 'report': {}, 352 }
Reset the internal data structure to its initial state.
Parameters: None
Returns: None
354 @staticmethod 355 def time(now: datetime = None) -> int: 356 """ 357 Generates a timestamp based on the provided datetime object or the current datetime. 358 359 Parameters: 360 now (datetime, optional): The datetime object to generate the timestamp from. 361 If not provided, the current datetime is used. 362 363 Returns: 364 int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), 365 before 1970 will return in negative until 1000AD. 366 """ 367 if now is None: 368 now = datetime.datetime.now() 369 ordinal_day = now.toordinal() 370 ns_in_day = (now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() * 10 ** 9 371 return int((ordinal_day - 719_163) * 86_400_000_000_000 + ns_in_day)
Generates a timestamp based on the provided datetime object or the current datetime.
Parameters: now (datetime, optional): The datetime object to generate the timestamp from. If not provided, the current datetime is used.
Returns: int: The timestamp in positive nanoseconds since the Unix epoch (January 1, 1970), before 1970 will return in negative until 1000AD.
373 @staticmethod 374 def time_to_datetime(ordinal_ns: int) -> datetime: 375 """ 376 Converts an ordinal number (number of days since 1000-01-01) to a datetime object. 377 378 Parameters: 379 ordinal_ns (int): The ordinal number of days since 1000-01-01. 380 381 Returns: 382 datetime: The corresponding datetime object. 383 """ 384 ordinal_day = ordinal_ns // 86_400_000_000_000 + 719_163 385 ns_in_day = ordinal_ns % 86_400_000_000_000 386 d = datetime.datetime.fromordinal(ordinal_day) 387 t = datetime.timedelta(seconds=ns_in_day // 10 ** 9) 388 return datetime.datetime.combine(d, datetime.time()) + t
Converts an ordinal number (number of days since 1000-01-01) to a datetime object.
Parameters: ordinal_ns (int): The ordinal number of days since 1000-01-01.
Returns: datetime: The corresponding datetime object.
390 def clean_history(self, lock: int | None = None) -> int: 391 """ 392 Cleans up the history of actions performed on the ZakatTracker instance. 393 394 Parameters: 395 lock (int, optional): The lock ID is used to clean up the empty history. 396 If not provided, it cleans up the empty history records for all locks. 397 398 Returns: 399 int: The number of locks cleaned up. 400 """ 401 count = 0 402 if lock in self._vault['history']: 403 if len(self._vault['history'][lock]) <= 0: 404 count += 1 405 del self._vault['history'][lock] 406 return count 407 self.free(self.lock()) 408 for lock in self._vault['history']: 409 if len(self._vault['history'][lock]) <= 0: 410 count += 1 411 del self._vault['history'][lock] 412 return count
Cleans up the history of actions performed on the ZakatTracker instance.
Parameters: lock (int, optional): The lock ID is used to clean up the empty history. If not provided, it cleans up the empty history records for all locks.
Returns: int: The number of locks cleaned up.
450 def nolock(self) -> bool: 451 """ 452 Check if the vault lock is currently not set. 453 454 Returns: 455 bool: True if the vault lock is not set, False otherwise. 456 """ 457 return self._vault['lock'] is None
Check if the vault lock is currently not set.
Returns: bool: True if the vault lock is not set, False otherwise.
459 def lock(self) -> int: 460 """ 461 Acquires a lock on the ZakatTracker instance. 462 463 Returns: 464 int: The lock ID. This ID can be used to release the lock later. 465 """ 466 return self._step()
Acquires a lock on the ZakatTracker instance.
Returns: int: The lock ID. This ID can be used to release the lock later.
468 def vault(self) -> dict: 469 """ 470 Returns a copy of the internal vault dictionary. 471 472 This method is used to retrieve the current state of the ZakatTracker object. 473 It provides a snapshot of the internal data structure, allowing for further 474 processing or analysis. 475 476 Returns: 477 dict: A copy of the internal vault dictionary. 478 """ 479 return self._vault.copy()
Returns a copy of the internal vault dictionary.
This method is used to retrieve the current state of the ZakatTracker object. It provides a snapshot of the internal data structure, allowing for further processing or analysis.
Returns: dict: A copy of the internal vault dictionary.
481 def stats(self) -> dict[str, tuple]: 482 """ 483 Calculates and returns statistics about the object's data storage. 484 485 This method determines the size of the database file on disk and the 486 size of the data currently held in RAM (likely within a dictionary). 487 Both sizes are reported in bytes and in a human-readable format 488 (e.g., KB, MB). 489 490 Returns: 491 dict[str, tuple]: A dictionary containing the following statistics: 492 493 * 'database': A tuple with two elements: 494 - The database file size in bytes (int). 495 - The database file size in human-readable format (str). 496 * 'ram': A tuple with two elements: 497 - The RAM usage (dictionary size) in bytes (int). 498 - The RAM usage in human-readable format (str). 499 500 Example: 501 >>> stats = my_object.stats() 502 >>> print(stats['database']) 503 (256000, '250.0 KB') 504 >>> print(stats['ram']) 505 (12345, '12.1 KB') 506 """ 507 ram_size = self.get_dict_size(self.vault()) 508 file_size = os.path.getsize(self.path()) 509 return { 510 'database': (file_size, self.human_readable_size(file_size)), 511 'ram': (ram_size, self.human_readable_size(ram_size)), 512 }
Calculates and returns statistics about the object's data storage.
This method determines the size of the database file on disk and the size of the data currently held in RAM (likely within a dictionary). Both sizes are reported in bytes and in a human-readable format (e.g., KB, MB).
Returns: dict[str, tuple]: A dictionary containing the following statistics:
* 'database': A tuple with two elements:
- The database file size in bytes (int).
- The database file size in human-readable format (str).
* 'ram': A tuple with two elements:
- The RAM usage (dictionary size) in bytes (int).
- The RAM usage in human-readable format (str).
Example:
>>> stats = my_object.stats()
>>> print(stats['database'])
(256000, '250.0 KB')
>>> print(stats['ram'])
(12345, '12.1 KB')
514 def steps(self) -> dict: 515 """ 516 Returns a copy of the history of steps taken in the ZakatTracker. 517 518 The history is a dictionary where each key is a unique identifier for a step, 519 and the corresponding value is a dictionary containing information about the step. 520 521 Returns: 522 dict: A copy of the history of steps taken in the ZakatTracker. 523 """ 524 return self._vault['history'].copy()
Returns a copy of the history of steps taken in the ZakatTracker.
The history is a dictionary where each key is a unique identifier for a step, and the corresponding value is a dictionary containing information about the step.
Returns: dict: A copy of the history of steps taken in the ZakatTracker.
526 def free(self, lock: int, auto_save: bool = True) -> bool: 527 """ 528 Releases the lock on the database. 529 530 Parameters: 531 lock (int): The lock ID to be released. 532 auto_save (bool): Whether to automatically save the database after releasing the lock. 533 534 Returns: 535 bool: True if the lock is successfully released and (optionally) saved, False otherwise. 536 """ 537 if lock == self._vault['lock']: 538 self._vault['lock'] = None 539 self.clean_history(lock) 540 if auto_save: 541 return self.save(self.path()) 542 return True 543 return False
Releases the lock on the database.
Parameters: lock (int): The lock ID to be released. auto_save (bool): Whether to automatically save the database after releasing the lock.
Returns: bool: True if the lock is successfully released and (optionally) saved, False otherwise.
545 def account_exists(self, account) -> bool: 546 """ 547 Check if the given account exists in the vault. 548 549 Parameters: 550 account (str): The account number to check. 551 552 Returns: 553 bool: True if the account exists, False otherwise. 554 """ 555 return account in self._vault['account']
Check if the given account exists in the vault.
Parameters: account (str): The account number to check.
Returns: bool: True if the account exists, False otherwise.
557 def box_size(self, account) -> int: 558 """ 559 Calculate the size of the box for a specific account. 560 561 Parameters: 562 account (str): The account number for which the box size needs to be calculated. 563 564 Returns: 565 int: The size of the box for the given account. If the account does not exist, -1 is returned. 566 """ 567 if self.account_exists(account): 568 return len(self._vault['account'][account]['box']) 569 return -1
Calculate the size of the box for a specific account.
Parameters: account (str): The account number for which the box size needs to be calculated.
Returns: int: The size of the box for the given account. If the account does not exist, -1 is returned.
571 def log_size(self, account) -> int: 572 """ 573 Get the size of the log for a specific account. 574 575 Parameters: 576 account (str): The account number for which the log size needs to be calculated. 577 578 Returns: 579 int: The size of the log for the given account. If the account does not exist, -1 is returned. 580 """ 581 if self.account_exists(account): 582 return len(self._vault['account'][account]['log']) 583 return -1
Get the size of the log for a specific account.
Parameters: account (str): The account number for which the log size needs to be calculated.
Returns: int: The size of the log for the given account. If the account does not exist, -1 is returned.
585 def recall(self, dry=True, debug=False) -> bool: 586 """ 587 Revert the last operation. 588 589 Parameters: 590 dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. 591 debug (bool): If True, the function will print debug information. Default is False. 592 593 Returns: 594 bool: True if the operation was successful, False otherwise. 595 """ 596 if not self.nolock() or len(self._vault['history']) == 0: 597 return False 598 if len(self._vault['history']) <= 0: 599 return False 600 ref = sorted(self._vault['history'].keys())[-1] 601 if debug: 602 print('recall', ref) 603 memory = self._vault['history'][ref] 604 if debug: 605 print(type(memory), 'memory', memory) 606 607 limit = len(memory) + 1 608 sub_positive_log_negative = 0 609 for i in range(-1, -limit, -1): 610 x = memory[i] 611 if debug: 612 print(type(x), x) 613 match x['action']: 614 case Action.CREATE: 615 if x['account'] is not None: 616 if self.account_exists(x['account']): 617 if debug: 618 print('account', self._vault['account'][x['account']]) 619 assert len(self._vault['account'][x['account']]['box']) == 0 620 assert self._vault['account'][x['account']]['balance'] == 0 621 assert self._vault['account'][x['account']]['count'] == 0 622 if dry: 623 continue 624 del self._vault['account'][x['account']] 625 626 case Action.TRACK: 627 if x['account'] is not None: 628 if self.account_exists(x['account']): 629 if dry: 630 continue 631 self._vault['account'][x['account']]['balance'] -= x['value'] 632 self._vault['account'][x['account']]['count'] -= 1 633 del self._vault['account'][x['account']]['box'][x['ref']] 634 635 case Action.LOG: 636 if x['account'] is not None: 637 if self.account_exists(x['account']): 638 if x['ref'] in self._vault['account'][x['account']]['log']: 639 if dry: 640 continue 641 if sub_positive_log_negative == -x['value']: 642 self._vault['account'][x['account']]['count'] -= 1 643 sub_positive_log_negative = 0 644 box_ref = self._vault['account'][x['account']]['log'][x['ref']]['ref'] 645 if not box_ref is None: 646 assert self.box_exists(x['account'], box_ref) 647 box_value = self._vault['account'][x['account']]['log'][x['ref']]['value'] 648 assert box_value < 0 649 self._vault['account'][x['account']]['box'][box_ref]['rest'] += -box_value 650 self._vault['account'][x['account']]['balance'] += -box_value 651 self._vault['account'][x['account']]['count'] -= 1 652 del self._vault['account'][x['account']]['log'][x['ref']] 653 654 case Action.SUB: 655 if x['account'] is not None: 656 if self.account_exists(x['account']): 657 if x['ref'] in self._vault['account'][x['account']]['box']: 658 if dry: 659 continue 660 self._vault['account'][x['account']]['box'][x['ref']]['rest'] += x['value'] 661 self._vault['account'][x['account']]['balance'] += x['value'] 662 sub_positive_log_negative = x['value'] 663 664 case Action.ADD_FILE: 665 if x['account'] is not None: 666 if self.account_exists(x['account']): 667 if x['ref'] in self._vault['account'][x['account']]['log']: 668 if x['file'] in self._vault['account'][x['account']]['log'][x['ref']]['file']: 669 if dry: 670 continue 671 del self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] 672 673 case Action.REMOVE_FILE: 674 if x['account'] is not None: 675 if self.account_exists(x['account']): 676 if x['ref'] in self._vault['account'][x['account']]['log']: 677 if dry: 678 continue 679 self._vault['account'][x['account']]['log'][x['ref']]['file'][x['file']] = x['value'] 680 681 case Action.BOX_TRANSFER: 682 if x['account'] is not None: 683 if self.account_exists(x['account']): 684 if x['ref'] in self._vault['account'][x['account']]['box']: 685 if dry: 686 continue 687 self._vault['account'][x['account']]['box'][x['ref']]['rest'] -= x['value'] 688 689 case Action.EXCHANGE: 690 if x['account'] is not None: 691 if x['account'] in self._vault['exchange']: 692 if x['ref'] in self._vault['exchange'][x['account']]: 693 if dry: 694 continue 695 del self._vault['exchange'][x['account']][x['ref']] 696 697 case Action.REPORT: 698 if x['ref'] in self._vault['report']: 699 if dry: 700 continue 701 del self._vault['report'][x['ref']] 702 703 case Action.ZAKAT: 704 if x['account'] is not None: 705 if self.account_exists(x['account']): 706 if x['ref'] in self._vault['account'][x['account']]['box']: 707 if x['key'] in self._vault['account'][x['account']]['box'][x['ref']]: 708 if dry: 709 continue 710 match x['math']: 711 case MathOperation.ADDITION: 712 self._vault['account'][x['account']]['box'][x['ref']][x['key']] -= x[ 713 'value'] 714 case MathOperation.EQUAL: 715 self._vault['account'][x['account']]['box'][x['ref']][x['key']] = x['value'] 716 case MathOperation.SUBTRACTION: 717 self._vault['account'][x['account']]['box'][x['ref']][x['key']] += x[ 718 'value'] 719 720 if not dry: 721 del self._vault['history'][ref] 722 return True
Revert the last operation.
Parameters: dry (bool): If True, the function will not modify the data, but will simulate the operation. Default is True. debug (bool): If True, the function will print debug information. Default is False.
Returns: bool: True if the operation was successful, False otherwise.
724 def ref_exists(self, account: str, ref_type: str, ref: int) -> bool: 725 """ 726 Check if a specific reference (transaction) exists in the vault for a given account and reference type. 727 728 Parameters: 729 account (str): The account number for which to check the existence of the reference. 730 ref_type (str): The type of reference (e.g., 'box', 'log', etc.). 731 ref (int): The reference (transaction) number to check for existence. 732 733 Returns: 734 bool: True if the reference exists for the given account and reference type, False otherwise. 735 """ 736 if account in self._vault['account']: 737 return ref in self._vault['account'][account][ref_type] 738 return False
Check if a specific reference (transaction) exists in the vault for a given account and reference type.
Parameters: account (str): The account number for which to check the existence of the reference. ref_type (str): The type of reference (e.g., 'box', 'log', etc.). ref (int): The reference (transaction) number to check for existence.
Returns: bool: True if the reference exists for the given account and reference type, False otherwise.
740 def box_exists(self, account: str, ref: int) -> bool: 741 """ 742 Check if a specific box (transaction) exists in the vault for a given account and reference. 743 744 Parameters: 745 - account (str): The account number for which to check the existence of the box. 746 - ref (int): The reference (transaction) number to check for existence. 747 748 Returns: 749 - bool: True if the box exists for the given account and reference, False otherwise. 750 """ 751 return self.ref_exists(account, 'box', ref)
Check if a specific box (transaction) exists in the vault for a given account and reference.
Parameters:
- account (str): The account number for which to check the existence of the box.
- ref (int): The reference (transaction) number to check for existence.
Returns:
- bool: True if the box exists for the given account and reference, False otherwise.
753 def track(self, value: float = 0, desc: str = '', account: str = 1, logging: bool = True, created: int = None, 754 debug: bool = False) -> int: 755 """ 756 This function tracks a transaction for a specific account. 757 758 Parameters: 759 value (float): The value of the transaction. Default is 0. 760 desc (str): The description of the transaction. Default is an empty string. 761 account (str): The account for which the transaction is being tracked. Default is '1'. 762 logging (bool): Whether to log the transaction. Default is True. 763 created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. 764 debug (bool): Whether to print debug information. Default is False. 765 766 Returns: 767 int: The timestamp of the transaction. 768 769 This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box. 770 771 Raises: 772 ValueError: The log transaction happened again in the same nanosecond time. 773 ValueError: The box transaction happened again in the same nanosecond time. 774 """ 775 if debug: 776 print('track', f'debug={debug}') 777 if created is None: 778 created = self.time() 779 no_lock = self.nolock() 780 self.lock() 781 if not self.account_exists(account): 782 if debug: 783 print(f"account {account} created") 784 self._vault['account'][account] = { 785 'balance': 0, 786 'box': {}, 787 'count': 0, 788 'log': {}, 789 'hide': False, 790 'zakatable': True, 791 } 792 self._step(Action.CREATE, account) 793 if value == 0: 794 if no_lock: 795 self.free(self.lock()) 796 return 0 797 if logging: 798 self._log(value=value, desc=desc, account=account, created=created, ref=None, debug=debug) 799 if debug: 800 print('create-box', created) 801 if self.box_exists(account, created): 802 raise ValueError(f"The box transaction happened again in the same nanosecond time({created}).") 803 if debug: 804 print('created-box', created) 805 self._vault['account'][account]['box'][created] = { 806 'capital': value, 807 'count': 0, 808 'last': 0, 809 'rest': value, 810 'total': 0, 811 } 812 self._step(Action.TRACK, account, ref=created, value=value) 813 if no_lock: 814 self.free(self.lock()) 815 return created
This function tracks a transaction for a specific account.
Parameters: value (float): The value of the transaction. Default is 0. desc (str): The description of the transaction. Default is an empty string. account (str): The account for which the transaction is being tracked. Default is '1'. logging (bool): Whether to log the transaction. Default is True. created (int): The timestamp of the transaction. If not provided, it will be generated. Default is None. debug (bool): Whether to print debug information. Default is False.
Returns: int: The timestamp of the transaction.
This function creates a new account if it doesn't exist, logs the transaction if logging is True, and updates the account's balance and box.
Raises: ValueError: The log transaction happened again in the same nanosecond time. ValueError: The box transaction happened again in the same nanosecond time.
817 def log_exists(self, account: str, ref: int) -> bool: 818 """ 819 Checks if a specific transaction log entry exists for a given account. 820 821 Parameters: 822 account (str): The account number associated with the transaction log. 823 ref (int): The reference to the transaction log entry. 824 825 Returns: 826 bool: True if the transaction log entry exists, False otherwise. 827 """ 828 return self.ref_exists(account, 'log', ref)
Checks if a specific transaction log entry exists for a given account.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry.
Returns: bool: True if the transaction log entry exists, False otherwise.
874 def exchange(self, account, created: int = None, rate: float = None, description: str = None, 875 debug: bool = False) -> dict: 876 """ 877 This method is used to record or retrieve exchange rates for a specific account. 878 879 Parameters: 880 - account (str): The account number for which the exchange rate is being recorded or retrieved. 881 - created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used. 882 - rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate. 883 - description (str): A description of the exchange rate. 884 885 Returns: 886 - dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, 887 it returns a dictionary with default values for the rate and description. 888 """ 889 if debug: 890 print('exchange', f'debug={debug}') 891 if created is None: 892 created = self.time() 893 no_lock = self.nolock() 894 self.lock() 895 if rate is not None: 896 if rate <= 0: 897 return dict() 898 if account not in self._vault['exchange']: 899 self._vault['exchange'][account] = {} 900 if len(self._vault['exchange'][account]) == 0 and rate <= 1: 901 return {"time": created, "rate": 1, "description": None} 902 self._vault['exchange'][account][created] = {"rate": rate, "description": description} 903 self._step(Action.EXCHANGE, account, ref=created, value=rate) 904 if no_lock: 905 self.free(self.lock()) 906 if debug: 907 print("exchange-created-1", 908 f'account: {account}, created: {created}, rate:{rate}, description:{description}') 909 910 if account in self._vault['exchange']: 911 valid_rates = [(ts, r) for ts, r in self._vault['exchange'][account].items() if ts <= created] 912 if valid_rates: 913 latest_rate = max(valid_rates, key=lambda x: x[0]) 914 if debug: 915 print("exchange-read-1", 916 f'account: {account}, created: {created}, rate:{rate}, description:{description}', 917 'latest_rate', latest_rate) 918 result = latest_rate[1] 919 result['time'] = latest_rate[0] 920 return result # إرجاع قاموس يحتوي على المعدل والوصف 921 if debug: 922 print("exchange-read-0", f'account: {account}, created: {created}, rate:{rate}, description:{description}') 923 return {"time": created, "rate": 1, "description": None} # إرجاع القيمة الافتراضية مع وصف فارغ
This method is used to record or retrieve exchange rates for a specific account.
Parameters:
- account (str): The account number for which the exchange rate is being recorded or retrieved.
- created (int): The timestamp of the exchange rate. If not provided, the current timestamp will be used.
- rate (float): The exchange rate to be recorded. If not provided, the method will retrieve the latest exchange rate.
- description (str): A description of the exchange rate.
Returns:
- dict: A dictionary containing the latest exchange rate and its description. If no exchange rate is found, it returns a dictionary with default values for the rate and description.
925 @staticmethod 926 def exchange_calc(x: float, x_rate: float, y_rate: float) -> float: 927 """ 928 This function calculates the exchanged amount of a currency. 929 930 Args: 931 x (float): The original amount of the currency. 932 x_rate (float): The exchange rate of the original currency. 933 y_rate (float): The exchange rate of the target currency. 934 935 Returns: 936 float: The exchanged amount of the target currency. 937 """ 938 return (x * x_rate) / y_rate
This function calculates the exchanged amount of a currency.
Args: x (float): The original amount of the currency. x_rate (float): The exchange rate of the original currency. y_rate (float): The exchange rate of the target currency.
Returns: float: The exchanged amount of the target currency.
940 def exchanges(self) -> dict: 941 """ 942 Retrieve the recorded exchange rates for all accounts. 943 944 Parameters: 945 None 946 947 Returns: 948 dict: A dictionary containing all recorded exchange rates. 949 The keys are account names or numbers, and the values are dictionaries containing the exchange rates. 950 Each exchange rate dictionary has timestamps as keys and exchange rate details as values. 951 """ 952 return self._vault['exchange'].copy()
Retrieve the recorded exchange rates for all accounts.
Parameters: None
Returns: dict: A dictionary containing all recorded exchange rates. The keys are account names or numbers, and the values are dictionaries containing the exchange rates. Each exchange rate dictionary has timestamps as keys and exchange rate details as values.
954 def accounts(self) -> dict: 955 """ 956 Returns a dictionary containing account numbers as keys and their respective balances as values. 957 958 Parameters: 959 None 960 961 Returns: 962 dict: A dictionary where keys are account numbers and values are their respective balances. 963 """ 964 result = {} 965 for i in self._vault['account']: 966 result[i] = self._vault['account'][i]['balance'] 967 return result
Returns a dictionary containing account numbers as keys and their respective balances as values.
Parameters: None
Returns: dict: A dictionary where keys are account numbers and values are their respective balances.
969 def boxes(self, account) -> dict: 970 """ 971 Retrieve the boxes (transactions) associated with a specific account. 972 973 Parameters: 974 account (str): The account number for which to retrieve the boxes. 975 976 Returns: 977 dict: A dictionary containing the boxes associated with the given account. 978 If the account does not exist, an empty dictionary is returned. 979 """ 980 if self.account_exists(account): 981 return self._vault['account'][account]['box'] 982 return {}
Retrieve the boxes (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the boxes.
Returns: dict: A dictionary containing the boxes associated with the given account. If the account does not exist, an empty dictionary is returned.
984 def logs(self, account) -> dict: 985 """ 986 Retrieve the logs (transactions) associated with a specific account. 987 988 Parameters: 989 account (str): The account number for which to retrieve the logs. 990 991 Returns: 992 dict: A dictionary containing the logs associated with the given account. 993 If the account does not exist, an empty dictionary is returned. 994 """ 995 if self.account_exists(account): 996 return self._vault['account'][account]['log'] 997 return {}
Retrieve the logs (transactions) associated with a specific account.
Parameters: account (str): The account number for which to retrieve the logs.
Returns: dict: A dictionary containing the logs associated with the given account. If the account does not exist, an empty dictionary is returned.
999 def add_file(self, account: str, ref: int, path: str) -> int: 1000 """ 1001 Adds a file reference to a specific transaction log entry in the vault. 1002 1003 Parameters: 1004 account (str): The account number associated with the transaction log. 1005 ref (int): The reference to the transaction log entry. 1006 path (str): The path of the file to be added. 1007 1008 Returns: 1009 int: The reference of the added file. If the account or transaction log entry does not exist, returns 0. 1010 """ 1011 if self.account_exists(account): 1012 if ref in self._vault['account'][account]['log']: 1013 file_ref = self.time() 1014 self._vault['account'][account]['log'][ref]['file'][file_ref] = path 1015 no_lock = self.nolock() 1016 self.lock() 1017 self._step(Action.ADD_FILE, account, ref=ref, file=file_ref) 1018 if no_lock: 1019 self.free(self.lock()) 1020 return file_ref 1021 return 0
Adds a file reference to a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. path (str): The path of the file to be added.
Returns: int: The reference of the added file. If the account or transaction log entry does not exist, returns 0.
1023 def remove_file(self, account: str, ref: int, file_ref: int) -> bool: 1024 """ 1025 Removes a file reference from a specific transaction log entry in the vault. 1026 1027 Parameters: 1028 account (str): The account number associated with the transaction log. 1029 ref (int): The reference to the transaction log entry. 1030 file_ref (int): The reference of the file to be removed. 1031 1032 Returns: 1033 bool: True if the file reference is successfully removed, False otherwise. 1034 """ 1035 if self.account_exists(account): 1036 if ref in self._vault['account'][account]['log']: 1037 if file_ref in self._vault['account'][account]['log'][ref]['file']: 1038 x = self._vault['account'][account]['log'][ref]['file'][file_ref] 1039 del self._vault['account'][account]['log'][ref]['file'][file_ref] 1040 no_lock = self.nolock() 1041 self.lock() 1042 self._step(Action.REMOVE_FILE, account, ref=ref, file=file_ref, value=x) 1043 if no_lock: 1044 self.free(self.lock()) 1045 return True 1046 return False
Removes a file reference from a specific transaction log entry in the vault.
Parameters: account (str): The account number associated with the transaction log. ref (int): The reference to the transaction log entry. file_ref (int): The reference of the file to be removed.
Returns: bool: True if the file reference is successfully removed, False otherwise.
1048 def balance(self, account: str = 1, cached: bool = True) -> int: 1049 """ 1050 Calculate and return the balance of a specific account. 1051 1052 Parameters: 1053 account (str): The account number. Default is '1'. 1054 cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True. 1055 1056 Returns: 1057 int: The balance of the account. 1058 1059 Note: 1060 If cached is True, the function returns the cached balance. 1061 If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items. 1062 """ 1063 if cached: 1064 return self._vault['account'][account]['balance'] 1065 x = 0 1066 return [x := x + y['rest'] for y in self._vault['account'][account]['box'].values()][-1]
Calculate and return the balance of a specific account.
Parameters: account (str): The account number. Default is '1'. cached (bool): If True, use the cached balance. If False, calculate the balance from the box. Default is True.
Returns: int: The balance of the account.
Note: If cached is True, the function returns the cached balance. If cached is False, the function calculates the balance from the box by summing up the 'rest' values of all box items.
1068 def hide(self, account, status: bool = None) -> bool: 1069 """ 1070 Check or set the hide status of a specific account. 1071 1072 Parameters: 1073 account (str): The account number. 1074 status (bool, optional): The new hide status. If not provided, the function will return the current status. 1075 1076 Returns: 1077 bool: The current or updated hide status of the account. 1078 1079 Raises: 1080 None 1081 1082 Example: 1083 >>> tracker = ZakatTracker() 1084 >>> ref = tracker.track(51, 'desc', 'account1') 1085 >>> tracker.hide('account1') # Set the hide status of 'account1' to True 1086 False 1087 >>> tracker.hide('account1', True) # Set the hide status of 'account1' to True 1088 True 1089 >>> tracker.hide('account1') # Get the hide status of 'account1' by default 1090 True 1091 >>> tracker.hide('account1', False) 1092 False 1093 """ 1094 if self.account_exists(account): 1095 if status is None: 1096 return self._vault['account'][account]['hide'] 1097 self._vault['account'][account]['hide'] = status 1098 return status 1099 return False
Check or set the hide status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new hide status. If not provided, the function will return the current status.
Returns: bool: The current or updated hide status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.hide('account1') # Set the hide status of 'account1' to True
False
>>> tracker.hide('account1', True) # Set the hide status of 'account1' to True
True
>>> tracker.hide('account1') # Get the hide status of 'account1' by default
True
>>> tracker.hide('account1', False)
False
1101 def zakatable(self, account, status: bool = None) -> bool: 1102 """ 1103 Check or set the zakatable status of a specific account. 1104 1105 Parameters: 1106 account (str): The account number. 1107 status (bool, optional): The new zakatable status. If not provided, the function will return the current status. 1108 1109 Returns: 1110 bool: The current or updated zakatable status of the account. 1111 1112 Raises: 1113 None 1114 1115 Example: 1116 >>> tracker = ZakatTracker() 1117 >>> ref = tracker.track(51, 'desc', 'account1') 1118 >>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True 1119 True 1120 >>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True 1121 True 1122 >>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default 1123 True 1124 >>> tracker.zakatable('account1', False) 1125 False 1126 """ 1127 if self.account_exists(account): 1128 if status is None: 1129 return self._vault['account'][account]['zakatable'] 1130 self._vault['account'][account]['zakatable'] = status 1131 return status 1132 return False
Check or set the zakatable status of a specific account.
Parameters: account (str): The account number. status (bool, optional): The new zakatable status. If not provided, the function will return the current status.
Returns: bool: The current or updated zakatable status of the account.
Raises: None
Example:
>>> tracker = ZakatTracker()
>>> ref = tracker.track(51, 'desc', 'account1')
>>> tracker.zakatable('account1') # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1', True) # Set the zakatable status of 'account1' to True
True
>>> tracker.zakatable('account1') # Get the zakatable status of 'account1' by default
True
>>> tracker.zakatable('account1', False)
False
1134 def sub(self, x: float, desc: str = '', account: str = 1, created: int = None, debug: bool = False) -> tuple: 1135 """ 1136 Subtracts a specified value from an account's balance. 1137 1138 Parameters: 1139 x (float): The amount to be subtracted. 1140 desc (str): A description for the transaction. Defaults to an empty string. 1141 account (str): The account from which the value will be subtracted. Defaults to '1'. 1142 created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. 1143 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1144 1145 Returns: 1146 tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction. 1147 1148 If the amount to subtract is greater than the account's balance, 1149 the remaining amount will be transferred to a new transaction with a negative value. 1150 1151 Raises: 1152 ValueError: The box transaction happened again in the same nanosecond time. 1153 ValueError: The log transaction happened again in the same nanosecond time. 1154 """ 1155 if debug: 1156 print('sub', f'debug={debug}') 1157 if x < 0: 1158 return tuple() 1159 if x == 0: 1160 ref = self.track(x, '', account) 1161 return ref, ref 1162 if created is None: 1163 created = self.time() 1164 no_lock = self.nolock() 1165 self.lock() 1166 self.track(0, '', account) 1167 self._log(value=-x, desc=desc, account=account, created=created, ref=None, debug=debug) 1168 ids = sorted(self._vault['account'][account]['box'].keys()) 1169 limit = len(ids) + 1 1170 target = x 1171 if debug: 1172 print('ids', ids) 1173 ages = [] 1174 for i in range(-1, -limit, -1): 1175 if target == 0: 1176 break 1177 j = ids[i] 1178 if debug: 1179 print('i', i, 'j', j) 1180 rest = self._vault['account'][account]['box'][j]['rest'] 1181 if rest >= target: 1182 self._vault['account'][account]['box'][j]['rest'] -= target 1183 self._step(Action.SUB, account, ref=j, value=target) 1184 ages.append((j, target)) 1185 target = 0 1186 break 1187 elif target > rest > 0: 1188 chunk = rest 1189 target -= chunk 1190 self._step(Action.SUB, account, ref=j, value=chunk) 1191 ages.append((j, chunk)) 1192 self._vault['account'][account]['box'][j]['rest'] = 0 1193 if target > 0: 1194 self.track(-target, desc, account, False, created) 1195 ages.append((created, target)) 1196 if no_lock: 1197 self.free(self.lock()) 1198 return created, ages
Subtracts a specified value from an account's balance.
Parameters: x (float): The amount to be subtracted. desc (str): A description for the transaction. Defaults to an empty string. account (str): The account from which the value will be subtracted. Defaults to '1'. created (int): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: tuple: A tuple containing the timestamp of the transaction and a list of tuples representing the age of each transaction.
If the amount to subtract is greater than the account's balance, the remaining amount will be transferred to a new transaction with a negative value.
Raises: ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1200 def transfer(self, amount: int, from_account: str, to_account: str, desc: str = '', created: int = None, 1201 debug: bool = False) -> list[int]: 1202 """ 1203 Transfers a specified value from one account to another. 1204 1205 Parameters: 1206 amount (int): The amount to be transferred. 1207 from_account (str): The account from which the value will be transferred. 1208 to_account (str): The account to which the value will be transferred. 1209 desc (str, optional): A description for the transaction. Defaults to an empty string. 1210 created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. 1211 debug (bool): A flag indicating whether to print debug information. Defaults to False. 1212 1213 Returns: 1214 list[int]: A list of timestamps corresponding to the transactions made during the transfer. 1215 1216 Raises: 1217 ValueError: Transfer to the same account is forbidden. 1218 ValueError: The box transaction happened again in the same nanosecond time. 1219 ValueError: The log transaction happened again in the same nanosecond time. 1220 """ 1221 if debug: 1222 print('transfer', f'debug={debug}') 1223 if from_account == to_account: 1224 raise ValueError(f'Transfer to the same account is forbidden. {to_account}') 1225 if amount <= 0: 1226 return [] 1227 if created is None: 1228 created = self.time() 1229 (_, ages) = self.sub(amount, desc, from_account, created, debug=debug) 1230 times = [] 1231 source_exchange = self.exchange(from_account, created) 1232 target_exchange = self.exchange(to_account, created) 1233 1234 if debug: 1235 print('ages', ages) 1236 1237 for age, value in ages: 1238 target_amount = self.exchange_calc(value, source_exchange['rate'], target_exchange['rate']) 1239 # Perform the transfer 1240 if self.box_exists(to_account, age): 1241 if debug: 1242 print('box_exists', age) 1243 capital = self._vault['account'][to_account]['box'][age]['capital'] 1244 rest = self._vault['account'][to_account]['box'][age]['rest'] 1245 if debug: 1246 print( 1247 f"Transfer {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1248 selected_age = age 1249 if rest + target_amount > capital: 1250 self._vault['account'][to_account]['box'][age]['capital'] += target_amount 1251 selected_age = ZakatTracker.time() 1252 self._vault['account'][to_account]['box'][age]['rest'] += target_amount 1253 self._step(Action.BOX_TRANSFER, to_account, ref=selected_age, value=target_amount) 1254 y = self._log(value=target_amount, desc=f'TRANSFER {from_account} -> {to_account}', account=to_account, 1255 created=None, ref=None, debug=debug) 1256 times.append((age, y)) 1257 continue 1258 y = self.track(target_amount, desc, to_account, logging=True, created=age, debug=debug) 1259 if debug: 1260 print( 1261 f"Transferred {value} from `{from_account}` to `{to_account}` (equivalent to {target_amount} `{to_account}`).") 1262 times.append(y) 1263 return times
Transfers a specified value from one account to another.
Parameters: amount (int): The amount to be transferred. from_account (str): The account from which the value will be transferred. to_account (str): The account to which the value will be transferred. desc (str, optional): A description for the transaction. Defaults to an empty string. created (int, optional): The timestamp of the transaction. If not provided, the current timestamp will be used. debug (bool): A flag indicating whether to print debug information. Defaults to False.
Returns: list[int]: A list of timestamps corresponding to the transactions made during the transfer.
Raises: ValueError: Transfer to the same account is forbidden. ValueError: The box transaction happened again in the same nanosecond time. ValueError: The log transaction happened again in the same nanosecond time.
1265 def check(self, silver_gram_price: float, nisab: float = None, debug: bool = False, now: int = None, 1266 cycle: float = None) -> tuple: 1267 """ 1268 Check the eligibility for Zakat based on the given parameters. 1269 1270 Parameters: 1271 silver_gram_price (float): The price of a gram of silver. 1272 nisab (float): The minimum amount of wealth required for Zakat. If not provided, 1273 it will be calculated based on the silver_gram_price. 1274 debug (bool): Flag to enable debug mode. 1275 now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). 1276 cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle(). 1277 1278 Returns: 1279 tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, 1280 and a dictionary containing the Zakat plan. 1281 """ 1282 if debug: 1283 print('check', f'debug={debug}') 1284 if now is None: 1285 now = self.time() 1286 if cycle is None: 1287 cycle = ZakatTracker.TimeCycle() 1288 if nisab is None: 1289 nisab = ZakatTracker.Nisab(silver_gram_price) 1290 plan = {} 1291 below_nisab = 0 1292 brief = [0, 0, 0] 1293 valid = False 1294 if debug: 1295 print('exchanges', self.exchanges()) 1296 for x in self._vault['account']: 1297 if not self.zakatable(x): 1298 continue 1299 _box = self._vault['account'][x]['box'] 1300 _log = self._vault['account'][x]['log'] 1301 limit = len(_box) + 1 1302 ids = sorted(self._vault['account'][x]['box'].keys()) 1303 for i in range(-1, -limit, -1): 1304 j = ids[i] 1305 rest = float(_box[j]['rest']) 1306 if rest <= 0: 1307 continue 1308 exchange = self.exchange(x, created=self.time()) 1309 rest = ZakatTracker.exchange_calc(rest, float(exchange['rate']), 1) 1310 brief[0] += rest 1311 index = limit + i - 1 1312 epoch = (now - j) / cycle 1313 if debug: 1314 print(f"Epoch: {epoch}", _box[j]) 1315 if _box[j]['last'] > 0: 1316 epoch = (now - _box[j]['last']) / cycle 1317 if debug: 1318 print(f"Epoch: {epoch}") 1319 epoch = floor(epoch) 1320 if debug: 1321 print(f"Epoch: {epoch}", type(epoch), epoch == 0, 1 - epoch, epoch) 1322 if epoch == 0: 1323 continue 1324 if debug: 1325 print("Epoch - PASSED") 1326 brief[1] += rest 1327 if rest >= nisab: 1328 total = 0 1329 for _ in range(epoch): 1330 total += ZakatTracker.ZakatCut(float(rest) - float(total)) 1331 if total > 0: 1332 if x not in plan: 1333 plan[x] = {} 1334 valid = True 1335 brief[2] += total 1336 plan[x][index] = { 1337 'total': total, 1338 'count': epoch, 1339 'box_time': j, 1340 'box_capital': _box[j]['capital'], 1341 'box_rest': _box[j]['rest'], 1342 'box_last': _box[j]['last'], 1343 'box_total': _box[j]['total'], 1344 'box_count': _box[j]['count'], 1345 'box_log': _log[j]['desc'], 1346 'exchange_rate': exchange['rate'], 1347 'exchange_time': exchange['time'], 1348 'exchange_desc': exchange['description'], 1349 } 1350 else: 1351 chunk = ZakatTracker.ZakatCut(float(rest)) 1352 if chunk > 0: 1353 if x not in plan: 1354 plan[x] = {} 1355 if j not in plan[x].keys(): 1356 plan[x][index] = {} 1357 below_nisab += rest 1358 brief[2] += chunk 1359 plan[x][index]['below_nisab'] = chunk 1360 plan[x][index]['total'] = chunk 1361 plan[x][index]['count'] = epoch 1362 plan[x][index]['box_time'] = j 1363 plan[x][index]['box_capital'] = _box[j]['capital'] 1364 plan[x][index]['box_rest'] = _box[j]['rest'] 1365 plan[x][index]['box_last'] = _box[j]['last'] 1366 plan[x][index]['box_total'] = _box[j]['total'] 1367 plan[x][index]['box_count'] = _box[j]['count'] 1368 plan[x][index]['box_log'] = _log[j]['desc'] 1369 plan[x][index]['exchange_rate'] = exchange['rate'] 1370 plan[x][index]['exchange_time'] = exchange['time'] 1371 plan[x][index]['exchange_desc'] = exchange['description'] 1372 valid = valid or below_nisab >= nisab 1373 if debug: 1374 print(f"below_nisab({below_nisab}) >= nisab({nisab})") 1375 return valid, brief, plan
Check the eligibility for Zakat based on the given parameters.
Parameters: silver_gram_price (float): The price of a gram of silver. nisab (float): The minimum amount of wealth required for Zakat. If not provided, it will be calculated based on the silver_gram_price. debug (bool): Flag to enable debug mode. now (int): The current timestamp. If not provided, it will be calculated using ZakatTracker.time(). cycle (float): The time cycle for Zakat. If not provided, it will be calculated using ZakatTracker.TimeCycle().
Returns: tuple: A tuple containing a boolean indicating the eligibility for Zakat, a list of brief statistics, and a dictionary containing the Zakat plan.
1377 def build_payment_parts(self, demand: float, positive_only: bool = True) -> dict: 1378 """ 1379 Build payment parts for the Zakat distribution. 1380 1381 Parameters: 1382 demand (float): The total demand for payment in local currency. 1383 positive_only (bool): If True, only consider accounts with positive balance. Default is True. 1384 1385 Returns: 1386 dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: 1387 { 1388 'account': { 1389 'account_id': {'balance': float, 'rate': float, 'part': float}, 1390 ... 1391 }, 1392 'exceed': bool, 1393 'demand': float, 1394 'total': float, 1395 } 1396 """ 1397 total = 0 1398 parts = { 1399 'account': {}, 1400 'exceed': False, 1401 'demand': demand, 1402 } 1403 for x, y in self.accounts().items(): 1404 if positive_only and y <= 0: 1405 continue 1406 total += float(y) 1407 exchange = self.exchange(x) 1408 parts['account'][x] = {'balance': y, 'rate': exchange['rate'], 'part': 0} 1409 parts['total'] = total 1410 return parts
Build payment parts for the Zakat distribution.
Parameters: demand (float): The total demand for payment in local currency. positive_only (bool): If True, only consider accounts with positive balance. Default is True.
Returns: dict: A dictionary containing the payment parts for each account. The dictionary has the following structure: { 'account': { 'account_id': {'balance': float, 'rate': float, 'part': float}, ... }, 'exceed': bool, 'demand': float, 'total': float, }
1412 @staticmethod 1413 def check_payment_parts(parts: dict, debug: bool = False) -> int: 1414 """ 1415 Checks the validity of payment parts. 1416 1417 Parameters: 1418 parts (dict): A dictionary containing payment parts information. 1419 debug (bool): Flag to enable debug mode. 1420 1421 Returns: 1422 int: Returns 0 if the payment parts are valid, otherwise returns the error code. 1423 1424 Error Codes: 1425 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 1426 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 1427 3: 'part' value in parts['account'][x] is less than 0. 1428 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 1429 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 1430 6: The sum of 'part' values in parts['account'] does not match with 'demand' value. 1431 """ 1432 if debug: 1433 print('check_payment_parts', f'debug={debug}') 1434 for i in ['demand', 'account', 'total', 'exceed']: 1435 if i not in parts: 1436 return 1 1437 exceed = parts['exceed'] 1438 for x in parts['account']: 1439 for j in ['balance', 'rate', 'part']: 1440 if j not in parts['account'][x]: 1441 return 2 1442 if parts['account'][x]['part'] < 0: 1443 return 3 1444 if not exceed and parts['account'][x]['balance'] <= 0: 1445 return 4 1446 demand = parts['demand'] 1447 z = 0 1448 for _, y in parts['account'].items(): 1449 if not exceed and y['part'] > y['balance']: 1450 return 5 1451 z += ZakatTracker.exchange_calc(y['part'], y['rate'], 1) 1452 z = round(z, 2) 1453 demand = round(demand, 2) 1454 if debug: 1455 print('check_payment_parts', f'z = {z}, demand = {demand}') 1456 print('check_payment_parts', type(z), type(demand)) 1457 print('check_payment_parts', z != demand) 1458 print('check_payment_parts', str(z) != str(demand)) 1459 if z != demand and str(z) != str(demand): 1460 return 6 1461 return 0
Checks the validity of payment parts.
Parameters: parts (dict): A dictionary containing payment parts information. debug (bool): Flag to enable debug mode.
Returns: int: Returns 0 if the payment parts are valid, otherwise returns the error code.
Error Codes: 1: 'demand', 'account', 'total', or 'exceed' key is missing in parts. 2: 'balance', 'rate' or 'part' key is missing in parts['account'][x]. 3: 'part' value in parts['account'][x] is less than 0. 4: If 'exceed' is False, 'balance' value in parts['account'][x] is less than or equal to 0. 5: If 'exceed' is False, 'part' value in parts['account'][x] is greater than 'balance' value. 6: The sum of 'part' values in parts['account'] does not match with 'demand' value.
1463 def zakat(self, report: tuple, parts: Dict[str, Dict | bool | Any] = None, debug: bool = False) -> bool: 1464 """ 1465 Perform Zakat calculation based on the given report and optional parts. 1466 1467 Parameters: 1468 report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. 1469 parts (dict): A dictionary containing the payment parts for the zakat. 1470 debug (bool): A flag indicating whether to print debug information. 1471 1472 Returns: 1473 bool: True if the zakat calculation is successful, False otherwise. 1474 """ 1475 if debug: 1476 print('zakat', f'debug={debug}') 1477 valid, _, plan = report 1478 if not valid: 1479 return valid 1480 parts_exist = parts is not None 1481 if parts_exist: 1482 if self.check_payment_parts(parts, debug=debug) != 0: 1483 return False 1484 if debug: 1485 print('######### zakat #######') 1486 print('parts_exist', parts_exist) 1487 no_lock = self.nolock() 1488 self.lock() 1489 report_time = self.time() 1490 self._vault['report'][report_time] = report 1491 self._step(Action.REPORT, ref=report_time) 1492 created = self.time() 1493 for x in plan: 1494 target_exchange = self.exchange(x) 1495 if debug: 1496 print(plan[x]) 1497 print('-------------') 1498 print(self._vault['account'][x]['box']) 1499 ids = sorted(self._vault['account'][x]['box'].keys()) 1500 if debug: 1501 print('plan[x]', plan[x]) 1502 for i in plan[x].keys(): 1503 j = ids[i] 1504 if debug: 1505 print('i', i, 'j', j) 1506 self._step(Action.ZAKAT, account=x, ref=j, value=self._vault['account'][x]['box'][j]['last'], 1507 key='last', 1508 math_operation=MathOperation.EQUAL) 1509 self._vault['account'][x]['box'][j]['last'] = created 1510 amount = ZakatTracker.exchange_calc(float(plan[x][i]['total']), 1, float(target_exchange['rate'])) 1511 self._vault['account'][x]['box'][j]['total'] += amount 1512 self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='total', 1513 math_operation=MathOperation.ADDITION) 1514 self._vault['account'][x]['box'][j]['count'] += plan[x][i]['count'] 1515 self._step(Action.ZAKAT, account=x, ref=j, value=plan[x][i]['count'], key='count', 1516 math_operation=MathOperation.ADDITION) 1517 if not parts_exist: 1518 try: 1519 self._vault['account'][x]['box'][j]['rest'] -= amount 1520 except TypeError: 1521 self._vault['account'][x]['box'][j]['rest'] -= Decimal(amount) 1522 # self._step(Action.ZAKAT, account=x, ref=j, value=amount, key='rest', 1523 # math_operation=MathOperation.SUBTRACTION) 1524 self._log(-float(amount), desc='zakat-زكاة', account=x, created=None, ref=j, debug=debug) 1525 if parts_exist: 1526 for account, part in parts['account'].items(): 1527 if part['part'] == 0: 1528 continue 1529 if debug: 1530 print('zakat-part', account, part['rate']) 1531 target_exchange = self.exchange(account) 1532 amount = ZakatTracker.exchange_calc(part['part'], part['rate'], target_exchange['rate']) 1533 self.sub(amount, desc='zakat-part-دفعة-زكاة', account=account, debug=debug) 1534 if no_lock: 1535 self.free(self.lock()) 1536 return True
Perform Zakat calculation based on the given report and optional parts.
Parameters: report (tuple): A tuple containing the validity of the report, the report data, and the zakat plan. parts (dict): A dictionary containing the payment parts for the zakat. debug (bool): A flag indicating whether to print debug information.
Returns: bool: True if the zakat calculation is successful, False otherwise.
1538 def export_json(self, path: str = "data.json") -> bool: 1539 """ 1540 Exports the current state of the ZakatTracker object to a JSON file. 1541 1542 Parameters: 1543 path (str): The path where the JSON file will be saved. Default is "data.json". 1544 1545 Returns: 1546 bool: True if the export is successful, False otherwise. 1547 1548 Raises: 1549 No specific exceptions are raised by this method. 1550 """ 1551 with open(path, "w") as file: 1552 json.dump(self._vault, file, indent=4, cls=JSONEncoder) 1553 return True
Exports the current state of the ZakatTracker object to a JSON file.
Parameters: path (str): The path where the JSON file will be saved. Default is "data.json".
Returns: bool: True if the export is successful, False otherwise.
Raises: No specific exceptions are raised by this method.
1555 def save(self, path: str = None) -> bool: 1556 """ 1557 Saves the ZakatTracker's current state to a pickle file. 1558 1559 This method serializes the internal data (`_vault`) along with metadata 1560 (Python version, pickle protocol) for future compatibility. 1561 1562 Parameters: 1563 path (str, optional): File path for saving. Defaults to a predefined location. 1564 1565 Returns: 1566 bool: True if the save operation is successful, False otherwise. 1567 """ 1568 if path is None: 1569 path = self.path() 1570 with open(path, "wb") as f: 1571 version = f'{version_info.major}.{version_info.minor}.{version_info.micro}' 1572 pickle_protocol = pickle.HIGHEST_PROTOCOL 1573 data = { 1574 'python_version': version, 1575 'pickle_protocol': pickle_protocol, 1576 'data': self._vault, 1577 } 1578 pickle.dump(data, f, protocol=pickle_protocol) 1579 return True
Saves the ZakatTracker's current state to a pickle file.
This method serializes the internal data (_vault
) along with metadata
(Python version, pickle protocol) for future compatibility.
Parameters: path (str, optional): File path for saving. Defaults to a predefined location.
Returns: bool: True if the save operation is successful, False otherwise.
1581 def load(self, path: str = None) -> bool: 1582 """ 1583 Load the current state of the ZakatTracker object from a pickle file. 1584 1585 Parameters: 1586 path (str): The path where the pickle file is located. If not provided, it will use the default path. 1587 1588 Returns: 1589 bool: True if the load operation is successful, False otherwise. 1590 """ 1591 if path is None: 1592 path = self.path() 1593 if os.path.exists(path): 1594 with open(path, "rb") as f: 1595 data = pickle.load(f) 1596 self._vault = data['data'] 1597 return True 1598 return False
Load the current state of the ZakatTracker object from a pickle file.
Parameters: path (str): The path where the pickle file is located. If not provided, it will use the default path.
Returns: bool: True if the load operation is successful, False otherwise.
1600 def import_csv_cache_path(self): 1601 """ 1602 Generates the cache file path for imported CSV data. 1603 1604 This function constructs the file path where cached data from CSV imports 1605 will be stored. The cache file is a pickle file (.pickle extension) appended 1606 to the base path of the object. 1607 1608 Returns: 1609 str: The full path to the import CSV cache file. 1610 1611 Example: 1612 >>> obj = ZakatTracker('/data/reports') 1613 >>> obj.import_csv_cache_path() 1614 '/data/reports.import_csv.pickle' 1615 """ 1616 path = self.path() 1617 if path.endswith(".pickle"): 1618 path = path[:-7] 1619 return path + '.import_csv.pickle'
Generates the cache file path for imported CSV data.
This function constructs the file path where cached data from CSV imports will be stored. The cache file is a pickle file (.pickle extension) appended to the base path of the object.
Returns: str: The full path to the import CSV cache file.
Example:
obj = ZakatTracker('/data/reports') obj.import_csv_cache_path() '/data/reports.import_csv.pickle'
1621 def import_csv(self, path: str = 'file.csv', debug: bool = False) -> tuple: 1622 """ 1623 The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system. 1624 1625 Parameters: 1626 path (str): The path to the CSV file. Default is 'file.csv'. 1627 debug (bool): A flag indicating whether to print debug information. 1628 1629 Returns: 1630 tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, 1631 and a dictionary of bad transactions. 1632 1633 Notes: 1634 * Currency Pair Assumption: This function assumes that the exchange rates stored for each account 1635 are appropriate for the currency pairs involved in the conversions. 1636 * The exchange rate for each account is based on the last encountered transaction rate that is not equal 1637 to 1.0 or the previous rate for that account. 1638 * Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent 1639 transactions of the same account within the whole imported and existing dataset when doing `check` and 1640 `zakat` operations. 1641 1642 Example Usage: 1643 The CSV file should have the following format, rate is optional per transaction: 1644 account, desc, value, date, rate 1645 For example: 1646 safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1 1647 """ 1648 if debug: 1649 print('import_csv', f'debug={debug}') 1650 cache: list[int] = [] 1651 try: 1652 with open(self.import_csv_cache_path(), "rb") as f: 1653 cache = pickle.load(f) 1654 except: 1655 pass 1656 date_formats = [ 1657 "%Y-%m-%d %H:%M:%S", 1658 "%Y-%m-%dT%H:%M:%S", 1659 "%Y-%m-%dT%H%M%S", 1660 "%Y-%m-%d", 1661 ] 1662 created, found, bad = 0, 0, {} 1663 data: dict[int, list] = {} 1664 with open(path, newline='', encoding="utf-8") as f: 1665 i = 0 1666 for row in csv.reader(f, delimiter=','): 1667 i += 1 1668 hashed = hash(tuple(row)) 1669 if hashed in cache: 1670 found += 1 1671 continue 1672 account = row[0] 1673 desc = row[1] 1674 value = float(row[2]) 1675 rate = 1.0 1676 if row[4:5]: # Empty list if index is out of range 1677 rate = float(row[4]) 1678 date: int = 0 1679 for time_format in date_formats: 1680 try: 1681 date = self.time(datetime.datetime.strptime(row[3], time_format)) 1682 break 1683 except: 1684 pass 1685 # TODO: not allowed for negative dates 1686 if date == 0 or value == 0: 1687 bad[i] = row 1688 continue 1689 if date not in data: 1690 data[date] = [] 1691 # TODO: If duplicated time with different accounts with the same amount it is an indicator of a transfer 1692 data[date].append((date, value, desc, account, rate, hashed)) 1693 1694 if debug: 1695 print('import_csv', len(data)) 1696 1697 def process(row, index=0): 1698 nonlocal created 1699 (date, value, desc, account, rate, hashed) = row 1700 date += index 1701 if rate > 1: 1702 self.exchange(account, created=date, rate=rate) 1703 if value > 0: 1704 self.track(value, desc, account, True, date) 1705 elif value < 0: 1706 self.sub(-value, desc, account, date) 1707 created += 1 1708 cache.append(hashed) 1709 1710 for date, rows in sorted(data.items()): 1711 len_rows = len(rows) 1712 if len_rows == 1: 1713 process(rows[0]) 1714 continue 1715 if debug: 1716 print('-- Duplicated time detected', date, 'len', len_rows) 1717 print(rows) 1718 print('---------------------------------') 1719 for index, row in enumerate(rows): 1720 process(row, index) 1721 with open(self.import_csv_cache_path(), "wb") as f: 1722 pickle.dump(cache, f) 1723 return created, found, bad
The function reads the CSV file, checks for duplicate transactions, and creates the transactions in the system.
Parameters: path (str): The path to the CSV file. Default is 'file.csv'. debug (bool): A flag indicating whether to print debug information.
Returns: tuple: A tuple containing the number of transactions created, the number of transactions found in the cache, and a dictionary of bad transactions.
Notes:
* Currency Pair Assumption: This function assumes that the exchange rates stored for each account
are appropriate for the currency pairs involved in the conversions.
* The exchange rate for each account is based on the last encountered transaction rate that is not equal
to 1.0 or the previous rate for that account.
* Those rates will be merged into the exchange rates main data, and later it will be used for all subsequent
transactions of the same account within the whole imported and existing dataset when doing check
and
zakat
operations.
Example Usage: The CSV file should have the following format, rate is optional per transaction: account, desc, value, date, rate For example: safe-45, "Some text", 34872, 1988-06-30 00:00:00, 1
1729 @staticmethod 1730 def human_readable_size(size: float, decimal_places: int = 2) -> str: 1731 """ 1732 Converts a size in bytes to a human-readable format (e.g., KB, MB, GB). 1733 1734 This function iterates through progressively larger units of information 1735 (B, KB, MB, GB, etc.) and divides the input size until it fits within a 1736 range that can be expressed with a reasonable number before the unit. 1737 1738 Parameters: 1739 size (float): The size in bytes to convert. 1740 decimal_places (int, optional): The number of decimal places to display 1741 in the result. Defaults to 2. 1742 1743 Returns: 1744 str: A string representation of the size in a human-readable format, 1745 rounded to the specified number of decimal places. For example: 1746 - "1.50 KB" (1536 bytes) 1747 - "23.00 MB" (24117248 bytes) 1748 - "1.23 GB" (1325899906 bytes) 1749 """ 1750 if type(size) not in (float, int): 1751 raise TypeError("size must be a float or integer") 1752 if type(decimal_places) != int: 1753 raise TypeError("decimal_places must be an integer") 1754 for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']: 1755 if size < 1024.0: 1756 break 1757 size /= 1024.0 1758 return f"{size:.{decimal_places}f} {unit}"
Converts a size in bytes to a human-readable format (e.g., KB, MB, GB).
This function iterates through progressively larger units of information (B, KB, MB, GB, etc.) and divides the input size until it fits within a range that can be expressed with a reasonable number before the unit.
Parameters: size (float): The size in bytes to convert. decimal_places (int, optional): The number of decimal places to display in the result. Defaults to 2.
Returns: str: A string representation of the size in a human-readable format, rounded to the specified number of decimal places. For example: - "1.50 KB" (1536 bytes) - "23.00 MB" (24117248 bytes) - "1.23 GB" (1325899906 bytes)
1760 @staticmethod 1761 def get_dict_size(obj: dict, seen: set = None) -> float: 1762 """ 1763 Recursively calculates the approximate memory size of a dictionary and its contents in bytes. 1764 1765 This function traverses the dictionary structure, accounting for the size of keys, values, 1766 and any nested objects. It handles various data types commonly found in dictionaries 1767 (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case 1768 of circular references. 1769 1770 Parameters: 1771 obj (dict): The dictionary whose size is to be calculated. 1772 seen (set, optional): A set used internally to track visited objects 1773 and avoid circular references. Defaults to None. 1774 1775 Returns: 1776 float: An approximate size of the dictionary and its contents in bytes. 1777 1778 Note: 1779 - This function is a method of the `ZakatTracker` class and is likely used to 1780 estimate the memory footprint of data structures relevant to Zakat calculations. 1781 - The size calculation is approximate as it relies on `sys.getsizeof()`, which might 1782 not account for all memory overhead depending on the Python implementation. 1783 - Circular references are handled to prevent infinite recursion. 1784 - Basic numeric types (int, float, complex) are assumed to have fixed sizes. 1785 - String sizes are estimated based on character length and encoding. 1786 """ 1787 size = 0 1788 if seen is None: 1789 seen = set() 1790 1791 obj_id = id(obj) 1792 if obj_id in seen: 1793 return 0 1794 1795 seen.add(obj_id) 1796 size += sys.getsizeof(obj) 1797 1798 if isinstance(obj, dict): 1799 for k, v in obj.items(): 1800 size += ZakatTracker.get_dict_size(k, seen) 1801 size += ZakatTracker.get_dict_size(v, seen) 1802 elif isinstance(obj, (list, tuple, set, frozenset)): 1803 for item in obj: 1804 size += ZakatTracker.get_dict_size(item, seen) 1805 elif isinstance(obj, (int, float, complex)): # Handle numbers 1806 pass # Basic numbers have a fixed size, so nothing to add here 1807 elif isinstance(obj, str): # Handle strings 1808 size += len(obj) * sys.getsizeof(str().encode()) # Size per character in bytes 1809 return size
Recursively calculates the approximate memory size of a dictionary and its contents in bytes.
This function traverses the dictionary structure, accounting for the size of keys, values, and any nested objects. It handles various data types commonly found in dictionaries (e.g., lists, tuples, sets, numbers, strings) and prevents infinite recursion in case of circular references.
Parameters: obj (dict): The dictionary whose size is to be calculated. seen (set, optional): A set used internally to track visited objects and avoid circular references. Defaults to None.
Returns: float: An approximate size of the dictionary and its contents in bytes.
Note:
- This function is a method of the
ZakatTracker
class and is likely used to estimate the memory footprint of data structures relevant to Zakat calculations. - The size calculation is approximate as it relies on
sys.getsizeof()
, which might not account for all memory overhead depending on the Python implementation. - Circular references are handled to prevent infinite recursion.
- Basic numeric types (int, float, complex) are assumed to have fixed sizes.
- String sizes are estimated based on character length and encoding.
1811 @staticmethod 1812 def duration_from_nanoseconds(ns: int, 1813 show_zeros_in_spoken_time: bool = False, 1814 spoken_time_separator=',', 1815 millennia: str = 'Millennia', 1816 century: str = 'Century', 1817 years: str = 'Years', 1818 days: str = 'Days', 1819 hours: str = 'Hours', 1820 minutes: str = 'Minutes', 1821 seconds: str = 'Seconds', 1822 milli_seconds: str = 'MilliSeconds', 1823 micro_seconds: str = 'MicroSeconds', 1824 nano_seconds: str = 'NanoSeconds', 1825 ) -> tuple: 1826 """ 1827 REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 1828 Convert NanoSeconds to Human Readable Time Format. 1829 A NanoSeconds is a unit of time in the International System of Units (SI) equal 1830 to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. 1831 Its symbol is μs, sometimes simplified to us when Unicode is not available. 1832 A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond. 1833 1834 INPUT : ms (AKA: MilliSeconds) 1835 OUTPUT: tuple(string time_lapsed, string spoken_time) like format. 1836 OUTPUT Variables: time_lapsed, spoken_time 1837 1838 Example Input: duration_from_nanoseconds(ns) 1839 **"Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds"** 1840 Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') 1841 duration_from_nanoseconds(1234567890123456789012) 1842 """ 1843 us, ns = divmod(ns, 1000) 1844 ms, us = divmod(us, 1000) 1845 s, ms = divmod(ms, 1000) 1846 m, s = divmod(s, 60) 1847 h, m = divmod(m, 60) 1848 d, h = divmod(h, 24) 1849 y, d = divmod(d, 365) 1850 c, y = divmod(y, 100) 1851 n, c = divmod(c, 10) 1852 time_lapsed = f"{n:03.0f}:{c:04.0f}:{y:03.0f}:{d:03.0f}:{h:02.0f}:{m:02.0f}:{s:02.0f}::{ms:03.0f}::{us:03.0f}::{ns:03.0f}" 1853 spoken_time_part = [] 1854 if n > 0 or show_zeros_in_spoken_time: 1855 spoken_time_part.append(f"{n: 3d} {millennia}") 1856 if c > 0 or show_zeros_in_spoken_time: 1857 spoken_time_part.append(f"{c: 4d} {century}") 1858 if y > 0 or show_zeros_in_spoken_time: 1859 spoken_time_part.append(f"{y: 3d} {years}") 1860 if d > 0 or show_zeros_in_spoken_time: 1861 spoken_time_part.append(f"{d: 4d} {days}") 1862 if h > 0 or show_zeros_in_spoken_time: 1863 spoken_time_part.append(f"{h: 2d} {hours}") 1864 if m > 0 or show_zeros_in_spoken_time: 1865 spoken_time_part.append(f"{m: 2d} {minutes}") 1866 if s > 0 or show_zeros_in_spoken_time: 1867 spoken_time_part.append(f"{s: 2d} {seconds}") 1868 if ms > 0 or show_zeros_in_spoken_time: 1869 spoken_time_part.append(f"{ms: 3d} {milli_seconds}") 1870 if us > 0 or show_zeros_in_spoken_time: 1871 spoken_time_part.append(f"{us: 3d} {micro_seconds}") 1872 if ns > 0 or show_zeros_in_spoken_time: 1873 spoken_time_part.append(f"{ns: 3d} {nano_seconds}") 1874 return time_lapsed, spoken_time_separator.join(spoken_time_part)
REF https://github.com/JayRizzo/Random_Scripts/blob/master/time_measure.py#L106 Convert NanoSeconds to Human Readable Time Format. A NanoSeconds is a unit of time in the International System of Units (SI) equal to one millionth (0.000001 or 10−6 or 1⁄1,000,000) of a second. Its symbol is μs, sometimes simplified to us when Unicode is not available. A microsecond is equal to 1000 nanoseconds or 1⁄1,000 of a millisecond.
INPUT : ms (AKA: MilliSeconds) OUTPUT: tuple(string time_lapsed, string spoken_time) like format. OUTPUT Variables: time_lapsed, spoken_time
Example Input: duration_from_nanoseconds(ns) "Millennium:Century:Years:Days:Hours:Minutes:Seconds:MilliSeconds:MicroSeconds:NanoSeconds" Example Output: ('039:0001:047:325:05:02:03:456:789:012', ' 39 Millennia, 1 Century, 47 Years, 325 Days, 5 Hours, 2 Minutes, 3 Seconds, 456 MilliSeconds, 789 MicroSeconds, 12 NanoSeconds') duration_from_nanoseconds(1234567890123456789012)
1876 @staticmethod 1877 def day_to_time(day: int, month: int = 6, year: int = 2024) -> int: # افتراض أن الشهر هو يونيو والسنة 2024 1878 """ 1879 Convert a specific day, month, and year into a timestamp. 1880 1881 Parameters: 1882 day (int): The day of the month. 1883 month (int): The month of the year. Default is 6 (June). 1884 year (int): The year. Default is 2024. 1885 1886 Returns: 1887 int: The timestamp representing the given day, month, and year. 1888 1889 Note: 1890 This method assumes the default month and year if not provided. 1891 """ 1892 return ZakatTracker.time(datetime.datetime(year, month, day))
Convert a specific day, month, and year into a timestamp.
Parameters: day (int): The day of the month. month (int): The month of the year. Default is 6 (June). year (int): The year. Default is 2024.
Returns: int: The timestamp representing the given day, month, and year.
Note: This method assumes the default month and year if not provided.
1894 @staticmethod 1895 def generate_random_date(start_date: datetime.datetime, end_date: datetime.datetime) -> datetime.datetime: 1896 """ 1897 Generate a random date between two given dates. 1898 1899 Parameters: 1900 start_date (datetime.datetime): The start date from which to generate a random date. 1901 end_date (datetime.datetime): The end date until which to generate a random date. 1902 1903 Returns: 1904 datetime.datetime: A random date between the start_date and end_date. 1905 """ 1906 time_between_dates = end_date - start_date 1907 days_between_dates = time_between_dates.days 1908 random_number_of_days = random.randrange(days_between_dates) 1909 return start_date + datetime.timedelta(days=random_number_of_days)
Generate a random date between two given dates.
Parameters: start_date (datetime.datetime): The start date from which to generate a random date. end_date (datetime.datetime): The end date until which to generate a random date.
Returns: datetime.datetime: A random date between the start_date and end_date.
1911 @staticmethod 1912 def generate_random_csv_file(path: str = "data.csv", count: int = 1000, with_rate: bool = False, 1913 debug: bool = False) -> int: 1914 """ 1915 Generate a random CSV file with specified parameters. 1916 1917 Parameters: 1918 path (str): The path where the CSV file will be saved. Default is "data.csv". 1919 count (int): The number of rows to generate in the CSV file. Default is 1000. 1920 with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. 1921 debug (bool): A flag indicating whether to print debug information. 1922 1923 Returns: 1924 None. The function generates a CSV file at the specified path with the given count of rows. 1925 Each row contains a randomly generated account, description, value, and date. 1926 The value is randomly generated between 1000 and 100000, 1927 and the date is randomly generated between 1950-01-01 and 2023-12-31. 1928 If the row number is not divisible by 13, the value is multiplied by -1. 1929 """ 1930 if debug: 1931 print('generate_random_csv_file', f'debug={debug}') 1932 i = 0 1933 with open(path, "w", newline="") as csvfile: 1934 writer = csv.writer(csvfile) 1935 for i in range(count): 1936 account = f"acc-{random.randint(1, 1000)}" 1937 desc = f"Some text {random.randint(1, 1000)}" 1938 value = random.randint(1000, 100000) 1939 date = ZakatTracker.generate_random_date(datetime.datetime(1950, 1, 1), 1940 datetime.datetime(2023, 12, 31)).strftime("%Y-%m-%d %H:%M:%S") 1941 if not i % 13 == 0: 1942 value *= -1 1943 row = [account, desc, value, date] 1944 if with_rate: 1945 rate = random.randint(1, 100) * 0.12 1946 if debug: 1947 print('before-append', row) 1948 row.append(rate) 1949 if debug: 1950 print('after-append', row) 1951 writer.writerow(row) 1952 i = i + 1 1953 return i
Generate a random CSV file with specified parameters.
Parameters: path (str): The path where the CSV file will be saved. Default is "data.csv". count (int): The number of rows to generate in the CSV file. Default is 1000. with_rate (bool): If True, a random rate between 1.2% and 12% is added. Default is False. debug (bool): A flag indicating whether to print debug information.
Returns: None. The function generates a CSV file at the specified path with the given count of rows. Each row contains a randomly generated account, description, value, and date. The value is randomly generated between 1000 and 100000, and the date is randomly generated between 1950-01-01 and 2023-12-31. If the row number is not divisible by 13, the value is multiplied by -1.
1955 @staticmethod 1956 def create_random_list(max_sum, min_value=0, max_value=10): 1957 """ 1958 Creates a list of random integers whose sum does not exceed the specified maximum. 1959 1960 Args: 1961 max_sum: The maximum allowed sum of the list elements. 1962 min_value: The minimum possible value for an element (inclusive). 1963 max_value: The maximum possible value for an element (inclusive). 1964 1965 Returns: 1966 A list of random integers. 1967 """ 1968 result = [] 1969 current_sum = 0 1970 1971 while current_sum < max_sum: 1972 # Calculate the remaining space for the next element 1973 remaining_sum = max_sum - current_sum 1974 # Determine the maximum possible value for the next element 1975 next_max_value = min(remaining_sum, max_value) 1976 # Generate a random element within the allowed range 1977 next_element = random.randint(min_value, next_max_value) 1978 result.append(next_element) 1979 current_sum += next_element 1980 1981 return result
Creates a list of random integers whose sum does not exceed the specified maximum.
Args: max_sum: The maximum allowed sum of the list elements. min_value: The minimum possible value for an element (inclusive). max_value: The maximum possible value for an element (inclusive).
Returns: A list of random integers.
2188 def test(self, debug: bool = False) -> bool: 2189 if debug: 2190 print('test', f'debug={debug}') 2191 try: 2192 2193 assert self._history() 2194 2195 # Not allowed for duplicate transactions in the same account and time 2196 2197 created = ZakatTracker.time() 2198 self.track(100, 'test-1', 'same', True, created) 2199 failed = False 2200 try: 2201 self.track(50, 'test-1', 'same', True, created) 2202 except: 2203 failed = True 2204 assert failed is True 2205 2206 self.reset() 2207 2208 # Same account transfer 2209 for x in [1, 'a', True, 1.8, None]: 2210 failed = False 2211 try: 2212 self.transfer(1, x, x, 'same-account', debug=debug) 2213 except: 2214 failed = True 2215 assert failed is True 2216 2217 # Always preserve box age during transfer 2218 2219 series: list[tuple] = [ 2220 (30, 4), 2221 (60, 3), 2222 (90, 2), 2223 ] 2224 case = { 2225 30: { 2226 'series': series, 2227 'rest': 150, 2228 }, 2229 60: { 2230 'series': series, 2231 'rest': 120, 2232 }, 2233 90: { 2234 'series': series, 2235 'rest': 90, 2236 }, 2237 180: { 2238 'series': series, 2239 'rest': 0, 2240 }, 2241 270: { 2242 'series': series, 2243 'rest': -90, 2244 }, 2245 360: { 2246 'series': series, 2247 'rest': -180, 2248 }, 2249 } 2250 2251 selected_time = ZakatTracker.time() - ZakatTracker.TimeCycle() 2252 2253 for total in case: 2254 for x in case[total]['series']: 2255 self.track(x[0], f"test-{x} ages", 'ages', True, selected_time * x[1]) 2256 2257 refs = self.transfer(total, 'ages', 'future', 'Zakat Movement', debug=debug) 2258 2259 if debug: 2260 print('refs', refs) 2261 2262 ages_cache_balance = self.balance('ages') 2263 ages_fresh_balance = self.balance('ages', False) 2264 rest = case[total]['rest'] 2265 if debug: 2266 print('source', ages_cache_balance, ages_fresh_balance, rest) 2267 assert ages_cache_balance == rest 2268 assert ages_fresh_balance == rest 2269 2270 future_cache_balance = self.balance('future') 2271 future_fresh_balance = self.balance('future', False) 2272 if debug: 2273 print('target', future_cache_balance, future_fresh_balance, total) 2274 print('refs', refs) 2275 assert future_cache_balance == total 2276 assert future_fresh_balance == total 2277 2278 for ref in self._vault['account']['ages']['box']: 2279 ages_capital = self._vault['account']['ages']['box'][ref]['capital'] 2280 ages_rest = self._vault['account']['ages']['box'][ref]['rest'] 2281 future_capital = 0 2282 if ref in self._vault['account']['future']['box']: 2283 future_capital = self._vault['account']['future']['box'][ref]['capital'] 2284 future_rest = 0 2285 if ref in self._vault['account']['future']['box']: 2286 future_rest = self._vault['account']['future']['box'][ref]['rest'] 2287 if ages_capital != 0 and future_capital != 0 and future_rest != 0: 2288 if debug: 2289 print('================================================================') 2290 print('ages', ages_capital, ages_rest) 2291 print('future', future_capital, future_rest) 2292 if ages_rest == 0: 2293 assert ages_capital == future_capital 2294 elif ages_rest < 0: 2295 assert -ages_capital == future_capital 2296 elif ages_rest > 0: 2297 assert ages_capital == ages_rest + future_capital 2298 self.reset() 2299 assert len(self._vault['history']) == 0 2300 2301 assert self._history() 2302 assert self._history(False) is False 2303 assert self._history() is False 2304 assert self._history(True) 2305 assert self._history() 2306 2307 self._test_core(True, debug) 2308 self._test_core(False, debug) 2309 2310 transaction = [ 2311 ( 2312 20, 'wallet', 1, 800, 800, 800, 4, 5, 2313 -85, -85, -85, 6, 7, 2314 ), 2315 ( 2316 750, 'wallet', 'safe', 50, 50, 50, 4, 6, 2317 750, 750, 750, 1, 1, 2318 ), 2319 ( 2320 600, 'safe', 'bank', 150, 150, 150, 1, 2, 2321 600, 600, 600, 1, 1, 2322 ), 2323 ] 2324 for z in transaction: 2325 self.lock() 2326 x = z[1] 2327 y = z[2] 2328 self.transfer(z[0], x, y, 'test-transfer', debug=debug) 2329 assert self.balance(x) == z[3] 2330 xx = self.accounts()[x] 2331 assert xx == z[3] 2332 assert self.balance(x, False) == z[4] 2333 assert xx == z[4] 2334 2335 s = 0 2336 log = self._vault['account'][x]['log'] 2337 for i in log: 2338 s += log[i]['value'] 2339 if debug: 2340 print('s', s, 'z[5]', z[5]) 2341 assert s == z[5] 2342 2343 assert self.box_size(x) == z[6] 2344 assert self.log_size(x) == z[7] 2345 2346 yy = self.accounts()[y] 2347 assert self.balance(y) == z[8] 2348 assert yy == z[8] 2349 assert self.balance(y, False) == z[9] 2350 assert yy == z[9] 2351 2352 s = 0 2353 log = self._vault['account'][y]['log'] 2354 for i in log: 2355 s += log[i]['value'] 2356 assert s == z[10] 2357 2358 assert self.box_size(y) == z[11] 2359 assert self.log_size(y) == z[12] 2360 2361 if debug: 2362 pp().pprint(self.check(2.17)) 2363 2364 assert not self.nolock() 2365 history_count = len(self._vault['history']) 2366 if debug: 2367 print('history-count', history_count) 2368 assert history_count == 11 2369 assert not self.free(ZakatTracker.time()) 2370 assert self.free(self.lock()) 2371 assert self.nolock() 2372 assert len(self._vault['history']) == 11 2373 2374 # storage 2375 2376 _path = self.path('test.pickle') 2377 if os.path.exists(_path): 2378 os.remove(_path) 2379 self.save() 2380 assert os.path.getsize(_path) > 0 2381 self.reset() 2382 assert self.recall(False, debug) is False 2383 self.load() 2384 assert self._vault['account'] is not None 2385 2386 # recall 2387 2388 assert self.nolock() 2389 assert len(self._vault['history']) == 11 2390 assert self.recall(False, debug) is True 2391 assert len(self._vault['history']) == 10 2392 assert self.recall(False, debug) is True 2393 assert len(self._vault['history']) == 9 2394 2395 # exchange 2396 2397 self.exchange("cash", 25, 3.75, "2024-06-25") 2398 self.exchange("cash", 22, 3.73, "2024-06-22") 2399 self.exchange("cash", 15, 3.69, "2024-06-15") 2400 self.exchange("cash", 10, 3.66) 2401 2402 for i in range(1, 30): 2403 exchange = self.exchange("cash", i) 2404 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2405 if debug: 2406 print(i, rate, description, created) 2407 assert created 2408 if i < 10: 2409 assert rate == 1 2410 assert description is None 2411 elif i == 10: 2412 assert rate == 3.66 2413 assert description is None 2414 elif i < 15: 2415 assert rate == 3.66 2416 assert description is None 2417 elif i == 15: 2418 assert rate == 3.69 2419 assert description is not None 2420 elif i < 22: 2421 assert rate == 3.69 2422 assert description is not None 2423 elif i == 22: 2424 assert rate == 3.73 2425 assert description is not None 2426 elif i >= 25: 2427 assert rate == 3.75 2428 assert description is not None 2429 exchange = self.exchange("bank", i) 2430 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2431 if debug: 2432 print(i, rate, description, created) 2433 assert created 2434 assert rate == 1 2435 assert description is None 2436 2437 assert len(self._vault['exchange']) > 0 2438 assert len(self.exchanges()) > 0 2439 self._vault['exchange'].clear() 2440 assert len(self._vault['exchange']) == 0 2441 assert len(self.exchanges()) == 0 2442 2443 # حفظ أسعار الصرف باستخدام التواريخ بالنانو ثانية 2444 self.exchange("cash", ZakatTracker.day_to_time(25), 3.75, "2024-06-25") 2445 self.exchange("cash", ZakatTracker.day_to_time(22), 3.73, "2024-06-22") 2446 self.exchange("cash", ZakatTracker.day_to_time(15), 3.69, "2024-06-15") 2447 self.exchange("cash", ZakatTracker.day_to_time(10), 3.66) 2448 2449 for i in [x * 0.12 for x in range(-15, 21)]: 2450 if i <= 0: 2451 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) == 0 2452 else: 2453 assert len(self.exchange("test", ZakatTracker.time(), i, f"range({i})")) > 0 2454 2455 # اختبار النتائج باستخدام التواريخ بالنانو ثانية 2456 for i in range(1, 31): 2457 timestamp_ns = ZakatTracker.day_to_time(i) 2458 exchange = self.exchange("cash", timestamp_ns) 2459 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2460 if debug: 2461 print(i, rate, description, created) 2462 assert created 2463 if i < 10: 2464 assert rate == 1 2465 assert description is None 2466 elif i == 10: 2467 assert rate == 3.66 2468 assert description is None 2469 elif i < 15: 2470 assert rate == 3.66 2471 assert description is None 2472 elif i == 15: 2473 assert rate == 3.69 2474 assert description is not None 2475 elif i < 22: 2476 assert rate == 3.69 2477 assert description is not None 2478 elif i == 22: 2479 assert rate == 3.73 2480 assert description is not None 2481 elif i >= 25: 2482 assert rate == 3.75 2483 assert description is not None 2484 exchange = self.exchange("bank", i) 2485 rate, description, created = exchange['rate'], exchange['description'], exchange['time'] 2486 if debug: 2487 print(i, rate, description, created) 2488 assert created 2489 assert rate == 1 2490 assert description is None 2491 2492 # csv 2493 2494 csv_count = 1000 2495 2496 for with_rate, path in { 2497 False: 'test-import_csv-no-exchange', 2498 True: 'test-import_csv-with-exchange', 2499 }.items(): 2500 2501 if debug: 2502 print('test_import_csv', with_rate, path) 2503 2504 # csv 2505 2506 csv_path = path + '.csv' 2507 if os.path.exists(csv_path): 2508 os.remove(csv_path) 2509 c = self.generate_random_csv_file(csv_path, csv_count, with_rate, debug) 2510 if debug: 2511 print('generate_random_csv_file', c) 2512 assert c == csv_count 2513 assert os.path.getsize(csv_path) > 0 2514 cache_path = self.import_csv_cache_path() 2515 if os.path.exists(cache_path): 2516 os.remove(cache_path) 2517 self.reset() 2518 (created, found, bad) = self.import_csv(csv_path, debug) 2519 bad_count = len(bad) 2520 if debug: 2521 print(f"csv-imported: ({created}, {found}, {bad_count}) = count({csv_count})") 2522 tmp_size = os.path.getsize(cache_path) 2523 assert tmp_size > 0 2524 assert created + found + bad_count == csv_count 2525 assert created == csv_count 2526 assert bad_count == 0 2527 (created_2, found_2, bad_2) = self.import_csv(csv_path) 2528 bad_2_count = len(bad_2) 2529 if debug: 2530 print(f"csv-imported: ({created_2}, {found_2}, {bad_2_count})") 2531 print(bad) 2532 assert tmp_size == os.path.getsize(cache_path) 2533 assert created_2 + found_2 + bad_2_count == csv_count 2534 assert created == found_2 2535 assert bad_count == bad_2_count 2536 assert found_2 == csv_count 2537 assert bad_2_count == 0 2538 assert created_2 == 0 2539 2540 # payment parts 2541 2542 positive_parts = self.build_payment_parts(100, positive_only=True) 2543 assert self.check_payment_parts(positive_parts) != 0 2544 assert self.check_payment_parts(positive_parts) != 0 2545 all_parts = self.build_payment_parts(300, positive_only=False) 2546 assert self.check_payment_parts(all_parts) != 0 2547 assert self.check_payment_parts(all_parts) != 0 2548 if debug: 2549 pp().pprint(positive_parts) 2550 pp().pprint(all_parts) 2551 # dynamic discount 2552 suite = [] 2553 count = 3 2554 for exceed in [False, True]: 2555 case = [] 2556 for parts in [positive_parts, all_parts]: 2557 part = parts.copy() 2558 demand = part['demand'] 2559 if debug: 2560 print(demand, part['total']) 2561 i = 0 2562 z = demand / count 2563 cp = { 2564 'account': {}, 2565 'demand': demand, 2566 'exceed': exceed, 2567 'total': part['total'], 2568 } 2569 j = '' 2570 for x, y in part['account'].items(): 2571 x_exchange = self.exchange(x) 2572 zz = self.exchange_calc(z, 1, x_exchange['rate']) 2573 if exceed and zz <= demand: 2574 i += 1 2575 y['part'] = zz 2576 if debug: 2577 print(exceed, y) 2578 cp['account'][x] = y 2579 case.append(y) 2580 elif not exceed and y['balance'] >= zz: 2581 i += 1 2582 y['part'] = zz 2583 if debug: 2584 print(exceed, y) 2585 cp['account'][x] = y 2586 case.append(y) 2587 j = x 2588 if i >= count: 2589 break 2590 if len(cp['account'][j]) > 0: 2591 suite.append(cp) 2592 if debug: 2593 print('suite', len(suite)) 2594 # vault = self._vault.copy() 2595 for case in suite: 2596 # self._vault = vault.copy() 2597 if debug: 2598 print('case', case) 2599 result = self.check_payment_parts(case) 2600 if debug: 2601 print('check_payment_parts', result, f'exceed: {exceed}') 2602 assert result == 0 2603 2604 report = self.check(2.17, None, debug) 2605 (valid, brief, plan) = report 2606 if debug: 2607 print('valid', valid) 2608 zakat_result = self.zakat(report, parts=case, debug=debug) 2609 if debug: 2610 print('zakat-result', zakat_result) 2611 assert valid == zakat_result 2612 2613 assert self.save(path + '.pickle') 2614 assert self.export_json(path + '.json') 2615 2616 assert self.export_json("1000-transactions-test.json") 2617 assert self.save("1000-transactions-test.pickle") 2618 2619 self.reset() 2620 2621 # test transfer between accounts with different exchange rate 2622 2623 a_SAR = "Bank (SAR)" 2624 b_USD = "Bank (USD)" 2625 c_SAR = "Safe (SAR)" 2626 # 0: track, 1: check-exchange, 2: do-exchange, 3: transfer 2627 for case in [ 2628 (0, a_SAR, "SAR Gift", 1000, 1000), 2629 (1, a_SAR, 1), 2630 (0, b_USD, "USD Gift", 500, 500), 2631 (1, b_USD, 1), 2632 (2, b_USD, 3.75), 2633 (1, b_USD, 3.75), 2634 (3, 100, b_USD, a_SAR, "100 USD -> SAR", 400, 1375), 2635 (0, c_SAR, "Salary", 750, 750), 2636 (3, 375, c_SAR, b_USD, "375 SAR -> USD", 375, 500), 2637 (3, 3.75, a_SAR, b_USD, "3.75 SAR -> USD", 1371.25, 501), 2638 ]: 2639 match (case[0]): 2640 case 0: # track 2641 _, account, desc, x, balance = case 2642 self.track(value=x, desc=desc, account=account, debug=debug) 2643 2644 cached_value = self.balance(account, cached=True) 2645 fresh_value = self.balance(account, cached=False) 2646 if debug: 2647 print('account', account, 'cached_value', cached_value, 'fresh_value', fresh_value) 2648 assert cached_value == balance 2649 assert fresh_value == balance 2650 case 1: # check-exchange 2651 _, account, expected_rate = case 2652 t_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2653 if debug: 2654 print('t-exchange', t_exchange) 2655 assert t_exchange['rate'] == expected_rate 2656 case 2: # do-exchange 2657 _, account, rate = case 2658 self.exchange(account, rate=rate, debug=debug) 2659 b_exchange = self.exchange(account, created=ZakatTracker.time(), debug=debug) 2660 if debug: 2661 print('b-exchange', b_exchange) 2662 assert b_exchange['rate'] == rate 2663 case 3: # transfer 2664 _, x, a, b, desc, a_balance, b_balance = case 2665 self.transfer(x, a, b, desc, debug=debug) 2666 2667 cached_value = self.balance(a, cached=True) 2668 fresh_value = self.balance(a, cached=False) 2669 if debug: 2670 print('account', a, 'cached_value', cached_value, 'fresh_value', fresh_value) 2671 assert cached_value == a_balance 2672 assert fresh_value == a_balance 2673 2674 cached_value = self.balance(b, cached=True) 2675 fresh_value = self.balance(b, cached=False) 2676 if debug: 2677 print('account', b, 'cached_value', cached_value, 'fresh_value', fresh_value) 2678 assert cached_value == b_balance 2679 assert fresh_value == b_balance 2680 2681 # Transfer all in many chunks randomly from B to A 2682 a_SAR_balance = 1371.25 2683 b_USD_balance = 501 2684 b_USD_exchange = self.exchange(b_USD) 2685 amounts = ZakatTracker.create_random_list(b_USD_balance) 2686 if debug: 2687 print('amounts', amounts) 2688 i = 0 2689 for x in amounts: 2690 if debug: 2691 print(f'{i} - transfer-with-exchange({x})') 2692 self.transfer(x, b_USD, a_SAR, f"{x} USD -> SAR", debug=debug) 2693 2694 b_USD_balance -= x 2695 cached_value = self.balance(b_USD, cached=True) 2696 fresh_value = self.balance(b_USD, cached=False) 2697 if debug: 2698 print('account', b_USD, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 2699 b_USD_balance) 2700 assert cached_value == b_USD_balance 2701 assert fresh_value == b_USD_balance 2702 2703 a_SAR_balance += x * b_USD_exchange['rate'] 2704 cached_value = self.balance(a_SAR, cached=True) 2705 fresh_value = self.balance(a_SAR, cached=False) 2706 if debug: 2707 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 2708 a_SAR_balance, 'rate', b_USD_exchange['rate']) 2709 assert cached_value == a_SAR_balance 2710 assert fresh_value == a_SAR_balance 2711 i += 1 2712 2713 # Transfer all in many chunks randomly from C to A 2714 c_SAR_balance = 375 2715 amounts = ZakatTracker.create_random_list(c_SAR_balance) 2716 if debug: 2717 print('amounts', amounts) 2718 i = 0 2719 for x in amounts: 2720 if debug: 2721 print(f'{i} - transfer-with-exchange({x})') 2722 self.transfer(x, c_SAR, a_SAR, f"{x} SAR -> a_SAR", debug=debug) 2723 2724 c_SAR_balance -= x 2725 cached_value = self.balance(c_SAR, cached=True) 2726 fresh_value = self.balance(c_SAR, cached=False) 2727 if debug: 2728 print('account', c_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'excepted', 2729 c_SAR_balance) 2730 assert cached_value == c_SAR_balance 2731 assert fresh_value == c_SAR_balance 2732 2733 a_SAR_balance += x 2734 cached_value = self.balance(a_SAR, cached=True) 2735 fresh_value = self.balance(a_SAR, cached=False) 2736 if debug: 2737 print('account', a_SAR, 'cached_value', cached_value, 'fresh_value', fresh_value, 'expected', 2738 a_SAR_balance) 2739 assert cached_value == a_SAR_balance 2740 assert fresh_value == a_SAR_balance 2741 i += 1 2742 2743 assert self.export_json("accounts-transfer-with-exchange-rates.json") 2744 assert self.save("accounts-transfer-with-exchange-rates.pickle") 2745 2746 # check & zakat with exchange rates for many cycles 2747 2748 for rate, values in { 2749 1: { 2750 'in': [1000, 2000, 10000], 2751 'exchanged': [1000, 2000, 10000], 2752 'out': [25, 50, 731.40625], 2753 }, 2754 3.75: { 2755 'in': [200, 1000, 5000], 2756 'exchanged': [750, 3750, 18750], 2757 'out': [18.75, 93.75, 1371.38671875], 2758 }, 2759 }.items(): 2760 a, b, c = values['in'] 2761 m, n, o = values['exchanged'] 2762 x, y, z = values['out'] 2763 if debug: 2764 print('rate', rate, 'values', values) 2765 for case in [ 2766 (a, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 2767 {'safe': {0: {'below_nisab': x}}}, 2768 ], False, m), 2769 (b, 'safe', ZakatTracker.time() - ZakatTracker.TimeCycle(), [ 2770 {'safe': {0: {'count': 1, 'total': y}}}, 2771 ], True, n), 2772 (c, 'cave', ZakatTracker.time() - (ZakatTracker.TimeCycle() * 3), [ 2773 {'cave': {0: {'count': 3, 'total': z}}}, 2774 ], True, o), 2775 ]: 2776 if debug: 2777 print(f"############# check(rate: {rate}) #############") 2778 self.reset() 2779 self.exchange(account=case[1], created=case[2], rate=rate) 2780 self.track(value=case[0], desc='test-check', account=case[1], logging=True, created=case[2]) 2781 2782 # assert self.nolock() 2783 # history_size = len(self._vault['history']) 2784 # print('history_size', history_size) 2785 # assert history_size == 2 2786 assert self.lock() 2787 assert not self.nolock() 2788 report = self.check(2.17, None, debug) 2789 (valid, brief, plan) = report 2790 assert valid == case[4] 2791 if debug: 2792 print('brief', brief) 2793 assert case[5] == brief[0] 2794 assert case[5] == brief[1] 2795 2796 if debug: 2797 pp().pprint(plan) 2798 2799 for x in plan: 2800 assert case[1] == x 2801 if 'total' in case[3][0][x][0].keys(): 2802 assert case[3][0][x][0]['total'] == brief[2] 2803 assert plan[x][0]['total'] == case[3][0][x][0]['total'] 2804 assert plan[x][0]['count'] == case[3][0][x][0]['count'] 2805 else: 2806 assert plan[x][0]['below_nisab'] == case[3][0][x][0]['below_nisab'] 2807 if debug: 2808 pp().pprint(report) 2809 result = self.zakat(report, debug=debug) 2810 if debug: 2811 print('zakat-result', result, case[4]) 2812 assert result == case[4] 2813 report = self.check(2.17, None, debug) 2814 (valid, brief, plan) = report 2815 assert valid is False 2816 2817 history_size = len(self._vault['history']) 2818 if debug: 2819 print('history_size', history_size) 2820 assert history_size == 3 2821 assert not self.nolock() 2822 assert self.recall(False, debug) is False 2823 self.free(self.lock()) 2824 assert self.nolock() 2825 2826 for i in range(3, 0, -1): 2827 history_size = len(self._vault['history']) 2828 if debug: 2829 print('history_size', history_size) 2830 assert history_size == i 2831 assert self.recall(False, debug) is True 2832 2833 assert self.nolock() 2834 assert self.recall(False, debug) is False 2835 2836 history_size = len(self._vault['history']) 2837 if debug: 2838 print('history_size', history_size) 2839 assert history_size == 0 2840 2841 account_size = len(self._vault['account']) 2842 if debug: 2843 print('account_size', account_size) 2844 assert account_size == 0 2845 2846 report_size = len(self._vault['report']) 2847 if debug: 2848 print('report_size', report_size) 2849 assert report_size == 0 2850 2851 assert self.nolock() 2852 return True 2853 except: 2854 # pp().pprint(self._vault) 2855 assert self.export_json("test-snapshot.json") 2856 assert self.save("test-snapshot.pickle") 2857 raise
2860def test(debug: bool = False): 2861 ledger = ZakatTracker() 2862 start = ZakatTracker.time() 2863 assert ledger.test(debug=debug) 2864 if debug: 2865 print("#########################") 2866 print("######## TEST DONE ########") 2867 print("#########################") 2868 print(ZakatTracker.duration_from_nanoseconds(ZakatTracker.time() - start)) 2869 print("#########################")
73class Action(Enum): 74 CREATE = auto() 75 TRACK = auto() 76 LOG = auto() 77 SUB = auto() 78 ADD_FILE = auto() 79 REMOVE_FILE = auto() 80 BOX_TRANSFER = auto() 81 EXCHANGE = auto() 82 REPORT = auto() 83 ZAKAT = auto()
86class JSONEncoder(json.JSONEncoder): 87 def default(self, obj): 88 if isinstance(obj, Action) or isinstance(obj, MathOperation): 89 return obj.name # Serialize as the enum member's name 90 elif isinstance(obj, Decimal): 91 return float(obj) 92 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
87 def default(self, obj): 88 if isinstance(obj, Action) or isinstance(obj, MathOperation): 89 return obj.name # Serialize as the enum member's name 90 elif isinstance(obj, Decimal): 91 return float(obj) 92 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
55def start_file_server(database_path: str, database_callback: callable = None, csv_callback: callable = None, 56 debug: bool = False) -> tuple: 57 """ 58 Starts a multi-purpose HTTP server to manage file interactions for a Zakat application. 59 60 This server facilitates the following functionalities: 61 62 1. GET /{file_uuid}/get: Download the database file specified by `database_path`. 63 2. GET /{file_uuid}/upload: Display an HTML form for uploading files. 64 3. POST /{file_uuid}/upload: Handle file uploads, distinguishing between: 65 - Database File (.db): Replaces the existing database with the uploaded one. 66 - CSV File (.csv): Imports data from the CSV into the existing database. 67 68 Args: 69 database_path (str): The path to the pickle database file. 70 database_callback (callable, optional): A function to call after a successful database upload. 71 It receives the uploaded database path as its argument. 72 csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, 73 the database path, and the debug flag as its arguments. 74 debug (bool, optional): If True, print debugging information. Defaults to False. 75 76 Returns: 77 Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: 78 - file_name (str): The name of the database file. 79 - download_url (str): The URL to download the database file. 80 - upload_url (str): The URL to access the file upload form. 81 - server_thread (threading.Thread): The thread running the server. 82 - shutdown_server (Callable[[], None]): A function to gracefully shut down the server. 83 84 Example: 85 _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") 86 print(f"Download database: {download_url}") 87 print(f"Upload files: {upload_url}") 88 server_thread.start() 89 # ... later ... 90 shutdown_server() 91 """ 92 file_uuid = uuid.uuid4() 93 file_name = os.path.basename(database_path) 94 95 port = find_available_port() 96 download_url = f"http://localhost:{port}/{file_uuid}/get" 97 upload_url = f"http://localhost:{port}/{file_uuid}/upload" 98 99 class Handler(http.server.SimpleHTTPRequestHandler): 100 def do_GET(self): 101 if self.path == f"/{file_uuid}/get": 102 # GET: Serve the existing file 103 try: 104 with open(database_path, "rb") as f: 105 self.send_response(200) 106 self.send_header("Content-type", "application/octet-stream") 107 self.send_header("Content-Disposition", f'attachment; filename="{file_name}"') 108 self.end_headers() 109 self.wfile.write(f.read()) 110 except FileNotFoundError: 111 self.send_error(404, "File not found") 112 elif self.path == f"/{file_uuid}/upload": 113 # GET: Serve the upload form 114 self.send_response(200) 115 self.send_header("Content-type", "text/html") 116 self.end_headers() 117 self.wfile.write(f""" 118 <html lang="en"> 119 <head> 120 <title>Zakat File Server</title> 121 </head> 122 <body> 123 <h1>Zakat File Server</h1> 124 <h3>You can download the <a target="__blank" href="{download_url}">database file</a>...</h3> 125 <h3>Or upload a new file to restore a database or import `CSV` file:</h3> 126 <form action="/{file_uuid}/upload" method="post" enctype="multipart/form-data"> 127 <input type="file" name="file" required><br/> 128 <input type="radio" id="{FileType.Database.value}" name="upload_type" value="{FileType.Database.value}" required> 129 <label for="database">Database File</label><br/> 130 <input type="radio"id="{FileType.CSV.value}" name="upload_type" value="{FileType.CSV.value}"> 131 <label for="csv">CSV File</label><br/> 132 <input type="submit" value="Upload"><br/> 133 </form> 134 </body></html> 135 """.encode()) 136 else: 137 self.send_error(404) 138 139 def do_POST(self): 140 if self.path == f"/{file_uuid}/upload": 141 # POST: Handle request 142 # 1. Get the Form Data 143 form_data = cgi.FieldStorage( 144 fp=self.rfile, 145 headers=self.headers, 146 environ={'REQUEST_METHOD': 'POST'} 147 ) 148 upload_type = form_data.getvalue("upload_type") 149 150 if debug: 151 print('upload_type', upload_type) 152 153 if upload_type not in [FileType.Database.value, FileType.CSV.value]: 154 self.send_error(400, "Invalid upload type") 155 return 156 157 # 2. Extract File Data 158 file_item = form_data['file'] # Assuming 'file' is your file input name 159 160 # 3. Get File Details 161 filename = file_item.filename 162 file_data = file_item.file.read() # Read the file's content 163 164 if debug: 165 print(f'Uploaded filename: {filename}') 166 167 # 4. Define Storage Path for CSV 168 upload_directory = "./uploads" # Create this directory if it doesn't exist 169 os.makedirs(upload_directory, exist_ok=True) 170 file_path = os.path.join(upload_directory, upload_type) 171 172 # 5. Write to Disk 173 with open(file_path, 'wb') as f: 174 f.write(file_data) 175 176 match upload_type: 177 case FileType.Database.value: 178 179 try: 180 # 6. Verify database file 181 # ZakatTracker(db_path=file_path) # FATAL, Circular Imports Error 182 if database_callback is not None: 183 database_callback(file_path) 184 185 # 7. Copy database into the original path 186 shutil.copy2(file_path, database_path) 187 except Exception as e: 188 self.send_error(400, str(e)) 189 return 190 191 case FileType.CSV.value: 192 # 6. Verify CSV file 193 try: 194 # x = ZakatTracker(db_path=database_path) # FATAL, Circular Imports Error 195 # result = x.import_csv(file_path, debug=debug) 196 if csv_callback is not None: 197 result = csv_callback(file_path, database_path, debug) 198 if debug: 199 print(f'CSV imported: {result}') 200 if len(result[2]) != 0: 201 self.send_response(200) 202 self.end_headers() 203 self.wfile.write(json.dumps(result).encode()) 204 return 205 except Exception as e: 206 self.send_error(400, str(e)) 207 return 208 209 self.send_response(200) 210 self.end_headers() 211 self.wfile.write(b"File uploaded successfully.") 212 213 httpd = socketserver.TCPServer(("localhost", port), Handler) 214 server_thread = threading.Thread(target=httpd.serve_forever) 215 216 def shutdown_server(): 217 nonlocal httpd, server_thread 218 httpd.shutdown() 219 httpd.server_close() # Close the socket 220 server_thread.join() # Wait for the thread to finish 221 222 return file_name, download_url, upload_url, server_thread, shutdown_server
Starts a multi-purpose HTTP server to manage file interactions for a Zakat application.
This server facilitates the following functionalities:
- GET /{file_uuid}/get: Download the database file specified by
database_path
. - GET /{file_uuid}/upload: Display an HTML form for uploading files.
- POST /{file_uuid}/upload: Handle file uploads, distinguishing between:
- Database File (.db): Replaces the existing database with the uploaded one.
- CSV File (.csv): Imports data from the CSV into the existing database.
Args: database_path (str): The path to the pickle database file. database_callback (callable, optional): A function to call after a successful database upload. It receives the uploaded database path as its argument. csv_callback (callable, optional): A function to call after a successful CSV upload. It receives the uploaded CSV path, the database path, and the debug flag as its arguments. debug (bool, optional): If True, print debugging information. Defaults to False.
Returns: Tuple[str, str, str, threading.Thread, Callable[[], None]]: A tuple containing: - file_name (str): The name of the database file. - download_url (str): The URL to download the database file. - upload_url (str): The URL to access the file upload form. - server_thread (threading.Thread): The thread running the server. - shutdown_server (Callable[[], None]): A function to gracefully shut down the server.
Example: _, download_url, upload_url, server_thread, shutdown_server = start_file_server("zakat.db") print(f"Download database: {download_url}") print(f"Upload files: {upload_url}") server_thread.start() # ... later ... shutdown_server()
32def find_available_port() -> int: 33 """ 34 Finds and returns an available TCP port on the local machine. 35 36 This function utilizes a TCP server socket to bind to port 0, which 37 instructs the operating system to automatically assign an available 38 port. The assigned port is then extracted and returned. 39 40 Returns: 41 int: The available TCP port number. 42 43 Raises: 44 OSError: If an error occurs during the port binding process, such 45 as all ports being in use. 46 47 Example: 48 port = find_available_port() 49 print(f"Available port: {port}") 50 """ 51 with socketserver.TCPServer(("localhost", 0), None) as s: 52 return s.server_address[1]
Finds and returns an available TCP port on the local machine.
This function utilizes a TCP server socket to bind to port 0, which instructs the operating system to automatically assign an available port. The assigned port is then extracted and returned.
Returns: int: The available TCP port number.
Raises: OSError: If an error occurs during the port binding process, such as all ports being in use.
Example: port = find_available_port() print(f"Available port: {port}")