Python Integration Guide Home > Developers > Python Integration guide to integrating with Noderr Protocol using Python with production-ready code examples. Smart Contract Calls python from web3 import Web3 from web3.contract import Contract import json import os # Initialize Web3 provider w3 = Web3(Web3.HTTPProvider(os.getenv('RPC_URL'))) # Contract configuration VAULT_ADDRESS = '0x...' VAULT_ABI = json.loads('''[...]''') # Create contract instance vault_contract: Contract = w3.eth.contract( address=Web3.toChecksumAddress(VAULT_ADDRESS), abi=VAULT_ABI ) # Get vault balance def get_vault_balance(user_address: str) -> float: try: balance = vault_contract.functions.balanceOf( Web3.toChecksumAddress(user_address) ).call() return Web3.fromWei(balance, 'ether') except Exception as e: raise ContractCallError(f'Failed to get balance: {e}') # Deposit to vault def deposit_to_vault(amount: float, private_key: str) -> str: try: account = w3.eth.account.from_key(private_key) amount_wei = Web3.toWei(amount, 'ether') tx = vault_contract.functions.deposit( amount_wei, account.address ).buildTransaction({ 'from': account.address, 'nonce': w3.eth.get_transaction_count(account.address), 'gas': 200000, 'gasPrice': w3.eth.gas_price, }) signed_tx = w3.eth.account.sign_transaction(tx, private_key) tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction) return tx_hash.hex() except Exception as e: raise ContractCallError(f'Deposit failed: {e}') # Withdraw from vault def withdraw_from_vault(shares: float, private_key: str) -> str: try: account = w3.eth.account.from_key(private_key) shares_wei = Web3.toWei(shares, 'ether') tx = vault_contract.functions.withdraw( shares_wei, account.address, account.address ).buildTransaction({ 'from': account.address, 'nonce': w3.eth.get_transaction_count(account.address), 'gas': 200000, 'gasPrice': w3.eth.gas_price, }) signed_tx = w3.eth.account.sign_transaction(tx, private_key) tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction) return tx_hash.hex() except Exception as e: raise ContractCallError(f'Withdrawal failed: {e}') # Monitor transaction status def wait_for_transaction(tx_hash: str, timeout: int = 300) -> dict: try: receipt = w3.eth.wait_for_transaction_receipt( tx_hash, timeout=timeout ) if receipt['status'] == 1: return {'success': True, 'receipt': receipt} else: return {'success': False, 'receipt': receipt} except Exception as e: raise TransactionError(f'Failed to monitor transaction: {e}') class ContractCallError(Exception): pass class TransactionError(Exception): pass Async/Await Patterns python import asyncio from typing import List, Dict, Any async def fetch_multiple_vaults(vault_ids: List[str]) -> List[Dict[str, Any]]: client = NoderrClient() tasks = [ asyncio.to_thread(client.get, f'/vaults/{vault_id}') for vault_id in vault_ids ] results = await asyncio.gather(*tasks, return_exceptions=True) vaults = [] for vault_id, result in zip(vault_ids, results): if isinstance(result, Exception): print(f'Error fetching vault {vault_id}: {result}') else: vaults.append(result) return vaults async def fetch_with_timeout(client, endpoint: str, timeout: int = 10) -> Dict: try: return await asyncio.wait_for( asyncio.to_thread(client.get, endpoint), timeout=timeout ) except asyncio.TimeoutError: raise TimeoutError(f'Request to {endpoint} timed out after {timeout}s') async def (): vault_ids = ['vault-1', 'vault-2', 'vault-3'] vaults = await fetch_multiple_vaults(vault_ids) print(vaults) asyncio.run(()) Error Handling python import logging from typing import Callable, Any import time logger = logging.getLogger(__name__) class NoderrException(Exception): def __init__(self, message: str, code: str = None, details: dict = None): super().__init__(message) self.code = code self.details = details or {} self.timestamp = time.time() def retry_with_backoff( func: Callable, max_retries: int = 3, base_delay: int = 1, backoff_factor: float = 2.0 ) -> Any: for attempt in range(max_retries): try: return func() except Exception as e: if attempt == max_retries - 1: raise delay = base_delay * (backoff_factor ** attempt) logger.warning(f'Attempt {attempt + 1} failed. Retrying in {delay}s: {e}') time.sleep(delay) def handle_errors(func: Callable) -> Callable: def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except requests.exceptions.Timeout: raise NoderrException('Request timeout', 'TIMEOUT') except requests.exceptions.ConnectionError: raise NoderrException('Connection error', 'CONNECTION_ERROR') except json.JSONDecodeError: raise NoderrException('Invalid JSON response', 'INVALID_JSON') except Exception as e: logger.error(f'Unexpected error: {e}') raise NoderrException(str(e), 'UNKNOWN_ERROR') return wrapper @handle_errors def safe_api_call(client, endpoint): return client.get(endpoint) Best Practices Summary 1. Use session management for connection pooling 2. Implement retry logic with exponential backoff 3. Set appropriate timeouts for all requests 4. Validate all inputs before sending 5. Handle exceptions gracefully 6. Use async/await for concurrent operations 7. Log errors with context 8. Use type hints for code clarity --- See Also: REST API Documentation | GraphQL API Documentation | Smart Contract Integration --- See Also: - Noderr White Paper v7.1 - Noderr Lite Paper v3.1 - Noderr Protocol Documentation