Python Integration Guide Home > Developers > Python Integration guide to integrating with Noderr Protocol using Python with production-ready code examples.

REST API Client Setup python import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import os from typing import Dict, Any class NoderrClient: def __init__(self, api_key: str = None, base_url: str = 'https://api.noderr.xyz/v1'): self.api_key = api_key or os.getenv('NODERR_API_KEY') self.base_url = base_url self.session = self._create_session() def _create_session(self) -> requests.Session: session = requests.Session() retry_strategy = Retry( =3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=['GET', 'POST', 'PUT', 'DELETE'] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount('http://', adapter) session.mount('https://', adapter) session.headers.update({ 'Content-Type': 'application/json', 'Authorization': f'Bearer {self.api_key}' }) return session def get(self, endpoint: str, params: Dict = None) -> Dict[str, Any]: try: response = self.session.get( f'{self.base_url}{endpoint}', params=params, timeout=10 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise NoderrClientError(f'GET request failed: {str(e)}') def post(self, endpoint: str, data: Dict = None) -> Dict[str, Any]: try: response = self.session.post( f'{self.base_url}{endpoint}', json=data, timeout=10 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise NoderrClientError(f'POST request failed: {str(e)}') def get_vault(self, vault_id: str) -> Dict[str, Any]: return self.get(f'/vaults/{vault_id}') def deposit(self, vault_id: str, amount: float) -> Dict[str, Any]: return self.post(f'/vaults/{vault_id}/deposit', {'amount': amount}) def withdraw(self, vault_id: str, shares: float) -> Dict[str, Any]: return self.post(f'/vaults/{vault_id}/withdraw', {'shares': shares}) class NoderrClientError(Exception): pass

GraphQL Queries python import requests import json from typing import Dict, Any class GraphQLClient: def __init__(self, endpoint: str, api_key: str = None): self.endpoint = endpoint self.headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}' if api_key else '' } def execute(self, query: str, variables: Dict = None) -> Dict[str, Any]: payload = { 'query': query, 'variables': variables or {} } try: response = requests.post( self.endpoint, json=payload, headers=self.headers, timeout=10 ) response.raise_for_status() result = response.json() if 'errors' in result: raise GraphQLError(result['errors']) return result['data'] except requests.exceptions.RequestException as e: raise GraphQLError(f'Request failed: {str(e)}') class GraphQLError(Exception): pass # Example queries GET_VAULT_QUERY = ''' query GetVault($id: ID!) { vault(id: $id) { id name totalAssets apy strategies { id name allocation performance } } } ''' GET_USER_VOTES_QUERY = ''' query GetUserVotes($userId: ID!) { user(id: $userId) { id votes { proposalId vote votingPower timestamp } } } ''' GET_VAULT_PERFORMANCE_QUERY = ''' query GetVaultPerformance($vaultId: ID!, $period: String!) { vaultPerformance(vaultId: $vaultId, period: $period) { returns volatility sharpeRatio maxDrawdown } } ''' GET_PROPOSALS_QUERY = ''' query GetProposals($status: String!) { proposals(status: $status) { id title description votesFor votesAgainst endTime } } ''' # Usage client = GraphQLClient('https://api.noderr.xyz/graphql') vault_data = client.execute(GET_VAULT_QUERY, {'id': 'vault-1'})

WebSocket Connections python import asyncio import websockets import json from typing import Callable import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class WebSocketClient: def __init__(self, uri: str, api_key: str = None): self.uri = uri self.api_key = api_key self.websocket = None self.running = False async def connect(self): try: self.websocket = await websockets.connect( self.uri, extra_headers={'Authorization': f'Bearer {self.api_key}'} ) self.running = True logger.info('WebSocket connected') except Exception as e: logger.error(f'Connection failed: {e}') raise async def subscribe(self, channel: str, callback: Callable): if not self.websocket: await self.connect() subscribe_msg = { 'type': 'subscribe', 'channel': channel } await self.websocket.send(json.dumps(subscribe_msg)) try: async for message in self.websocket: data = json.loads(message) await callback(data) except websockets.exceptions.ConnectionClosed: logger.warning('WebSocket connection closed') self.running = False async def unsubscribe(self, channel: str): if self.websocket: unsubscribe_msg = { 'type': 'unsubscribe', 'channel': channel } await self.websocket.send(json.dumps(unsubscribe_msg)) async def close(self): if self.websocket: await self.websocket.close() self.running = False # Usage async def on_price_update(data): print(f'Price update: {data}') async def on_apy_change(data): print(f'APY change: {data}') async def (): ws_client = WebSocketClient('wss://api.noderr.xyz/ws') await ws_client.subscribe('prices:ETH', on_price_update) asyncio.run(())

Smart Contract Calls python from web3 import Web3 from web3.contract import Contract import json import os # Initialize Web3 provider w3 = Web3(Web3.HTTPProvider(os.getenv('RPC_URL'))) # Contract configuration VAULT_ADDRESS = '0x...' VAULT_ABI = json.loads('''[...]''') # Create contract instance vault_contract: Contract = w3.eth.contract( address=Web3.toChecksumAddress(VAULT_ADDRESS), abi=VAULT_ABI ) # Get vault balance def get_vault_balance(user_address: str) -> float: try: balance = vault_contract.functions.balanceOf( Web3.toChecksumAddress(user_address) ).call() return Web3.fromWei(balance, 'ether') except Exception as e: raise ContractCallError(f'Failed to get balance: {e}') # Deposit to vault def deposit_to_vault(amount: float, private_key: str) -> str: try: account = w3.eth.account.from_key(private_key) amount_wei = Web3.toWei(amount, 'ether') tx = vault_contract.functions.deposit( amount_wei, account.address ).buildTransaction({ 'from': account.address, 'nonce': w3.eth.get_transaction_count(account.address), 'gas': 200000, 'gasPrice': w3.eth.gas_price, }) signed_tx = w3.eth.account.sign_transaction(tx, private_key) tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction) return tx_hash.hex() except Exception as e: raise ContractCallError(f'Deposit failed: {e}') # Withdraw from vault def withdraw_from_vault(shares: float, private_key: str) -> str: try: account = w3.eth.account.from_key(private_key) shares_wei = Web3.toWei(shares, 'ether') tx = vault_contract.functions.withdraw( shares_wei, account.address, account.address ).buildTransaction({ 'from': account.address, 'nonce': w3.eth.get_transaction_count(account.address), 'gas': 200000, 'gasPrice': w3.eth.gas_price, }) signed_tx = w3.eth.account.sign_transaction(tx, private_key) tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction) return tx_hash.hex() except Exception as e: raise ContractCallError(f'Withdrawal failed: {e}') # Monitor transaction status def wait_for_transaction(tx_hash: str, timeout: int = 300) -> dict: try: receipt = w3.eth.wait_for_transaction_receipt( tx_hash, timeout=timeout ) if receipt['status'] == 1: return {'success': True, 'receipt': receipt} else: return {'success': False, 'receipt': receipt} except Exception as e: raise TransactionError(f'Failed to monitor transaction: {e}') class ContractCallError(Exception): pass class TransactionError(Exception): pass

Async/Await Patterns python import asyncio from typing import List, Dict, Any async def fetch_multiple_vaults(vault_ids: List[str]) -> List[Dict[str, Any]]: client = NoderrClient() tasks = [ asyncio.to_thread(client.get, f'/vaults/{vault_id}') for vault_id in vault_ids ] results = await asyncio.gather(*tasks, return_exceptions=True) vaults = [] for vault_id, result in zip(vault_ids, results): if isinstance(result, Exception): print(f'Error fetching vault {vault_id}: {result}') else: vaults.append(result) return vaults async def fetch_with_timeout(client, endpoint: str, timeout: int = 10) -> Dict: try: return await asyncio.wait_for( asyncio.to_thread(client.get, endpoint), timeout=timeout ) except asyncio.TimeoutError: raise TimeoutError(f'Request to {endpoint} timed out after {timeout}s') async def (): vault_ids = ['vault-1', 'vault-2', 'vault-3'] vaults = await fetch_multiple_vaults(vault_ids) print(vaults) asyncio.run(())

Error Handling python import logging from typing import Callable, Any import time logger = logging.getLogger(__name__) class NoderrException(Exception): def __init__(self, message: str, code: str = None, details: dict = None): super().__init__(message) self.code = code self.details = details or {} self.timestamp = time.time() def retry_with_backoff( func: Callable, max_retries: int = 3, base_delay: int = 1, backoff_factor: float = 2.0 ) -> Any: for attempt in range(max_retries): try: return func() except Exception as e: if attempt == max_retries - 1: raise delay = base_delay * (backoff_factor ** attempt) logger.warning(f'Attempt {attempt + 1} failed. Retrying in {delay}s: {e}') time.sleep(delay) def handle_errors(func: Callable) -> Callable: def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except requests.exceptions.Timeout: raise NoderrException('Request timeout', 'TIMEOUT') except requests.exceptions.ConnectionError: raise NoderrException('Connection error', 'CONNECTION_ERROR') except json.JSONDecodeError: raise NoderrException('Invalid JSON response', 'INVALID_JSON') except Exception as e: logger.error(f'Unexpected error: {e}') raise NoderrException(str(e), 'UNKNOWN_ERROR') return wrapper @handle_errors def safe_api_call(client, endpoint): return client.get(endpoint)

Data Validation python from dataclasses import dataclass from typing import Optional from datetime import datetime @dataclass class VaultData: id: str name: str total_assets: float apy: float created_at: datetime @classmethod def from_dict(cls, data: dict) -> 'VaultData': try: return cls( id=data['id'], name=data['name'], total_assets=float(data['totalAssets']), apy=float(data['apy']), created_at=datetime.fromisoformat(data['createdAt']) ) except (KeyError, ValueError, TypeError) as e: raise ValidationError(f'Invalid vault data: {e}') def validate_address(address: str) -> str: if not address.startswith('0x') or len(address)!= 42: raise ValidationError(f'Invalid Ethereum address: {address}') return address def validate_amount(amount: float) -> float: if amount <= 0: raise ValidationError(f'Amount must be positive: {amount}') return amount class ValidationError(Exception): pass # Usage vault_dict = { 'id': '1', 'name': 'Vault 1', 'totalAssets': '1000', 'apy': '0.08', 'createdAt': '2024-01-01T00:00:00' } vault = VaultData.from_dict(vault_dict)

Best Practices Summary 1. Use session management for connection pooling 2. Implement retry logic with exponential backoff 3. Set appropriate timeouts for all requests 4. Validate all inputs before sending 5. Handle exceptions gracefully 6. Use async/await for concurrent operations 7. Log errors with context 8. Use type hints for code clarity --- See Also:REST API Documentation | GraphQL API Documentation | Smart Contract Integration --- See Also: - Noderr White Paper v7.1 - Noderr Lite Paper v3.1 - Noderr Protocol Documentation

results matching ""

    No results matching ""