# Examples ## Basic Examples ### Simple USDT Monitor Monitor incoming USDT transactions with console output: ```python import asyncio from trc20_monitor import TRC20Monitor, MemoryDB, ConsoleNotifier async def main(): monitor = TRC20Monitor( database=MemoryDB(), notifier=ConsoleNotifier() ) # USDT contract on Tron USDT_CONTRACT = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t" await monitor.monitor_address( address="TYourTronAddress", contract=USDT_CONTRACT, direction="in" ) if __name__ == "__main__": asyncio.run(main()) ``` ### Exchange Deposit Monitor Monitor deposits to an exchange address with minimum amount filter: ```python import asyncio from trc20_monitor import TRC20Monitor, SQLiteDB, WebhookNotifier async def monitor_exchange_deposits(): # Use SQLite for persistent storage database = SQLiteDB("exchange_deposits.db") # Send notifications to webhook notifier = WebhookNotifier( url="https://api.exchange.com/deposit-webhook", headers={"X-API-Key": "your-api-key"} ) monitor = TRC20Monitor(database=database, notifier=notifier) # Monitor large USDT deposits (>= 1000 USDT) await monitor.monitor_address( address="TExchangeDepositAddress", contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t", direction="in", min_amount=1000.0, interval=10 # Check every 10 seconds ) asyncio.run(monitor_exchange_deposits()) ``` ## Advanced Examples ### Multi-Token Monitor Monitor multiple TRC20 tokens simultaneously: ```python import asyncio from trc20_monitor import TRC20Monitor, MemoryDB, CompositeNotifier from trc20_monitor import ConsoleNotifier, FileNotifier async def monitor_token(address: str, contract: str, token_name: str): notifier = CompositeNotifier([ ConsoleNotifier(), FileNotifier(f"{token_name}_transactions.log") ]) monitor = TRC20Monitor( database=MemoryDB(), notifier=notifier ) print(f"Starting {token_name} monitor...") await monitor.monitor_address( address=address, contract=contract, direction="both" ) async def main(): address = "TYourAddress" # Token contracts tokens = { "USDT": "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t", "USDC": "TEkxiTehnzSmSe2XqrBj4w32RUN966rdz8", "TUSD": "TUpMhErZL2fhh4sVNULAbNKLokS4GjC1F4" } # Create monitoring tasks tasks = [ monitor_token(address, contract, name) for name, contract in tokens.items() ] # Run all monitors concurrently await asyncio.gather(*tasks) asyncio.run(main()) ``` ### Custom Email Notifier Create a custom notifier that sends email alerts: ```python import asyncio import aiosmtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from trc20_monitor import NotificationAdapter, Transaction, TRC20Monitor, MemoryDB class EmailNotifier(NotificationAdapter): def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str, to_emails: list): self.smtp_host = smtp_host self.smtp_port = smtp_port self.username = username self.password = password self.to_emails = to_emails async def notify(self, transaction: Transaction) -> None: # Create email message message = MIMEMultipart("alternative") message["Subject"] = f"TRC20 Alert: {transaction.amount} received" message["From"] = self.username message["To"] = ", ".join(self.to_emails) # Email body html = f"""

New TRC20 Transaction

Amount{transaction.amount}
From{transaction.from_address}
To{transaction.to_address}
Hash{transaction.tx_hash}
Block{transaction.block_number}
""" part = MIMEText(html, "html") message.attach(part) # Send email await aiosmtplib.send( message, hostname=self.smtp_host, port=self.smtp_port, username=self.username, password=self.password, use_tls=True ) async def main(): email_notifier = EmailNotifier( smtp_host="smtp.gmail.com", smtp_port=587, username="your-email@gmail.com", password="your-app-password", to_emails=["admin@example.com", "alerts@example.com"] ) monitor = TRC20Monitor( database=MemoryDB(), notifier=email_notifier ) await monitor.monitor_address( address="TYourAddress", contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t", min_amount=100.0 # Only alert for transactions >= 100 USDT ) asyncio.run(main()) ``` ### Database with Transaction History Store and query transaction history: ```python import asyncio from datetime import datetime, timedelta from trc20_monitor import TRC20Monitor, SQLiteDB, ConsoleNotifier class ExtendedSQLiteDB(SQLiteDB): async def get_transactions_since(self, timestamp: int): async with self.connection() as conn: cursor = await conn.execute( """ SELECT * FROM transactions WHERE timestamp >= ? ORDER BY timestamp DESC """, (timestamp,) ) rows = await cursor.fetchall() return [self._row_to_transaction(row) for row in rows] async def get_daily_volume(self, address: str, contract: str): yesterday = int((datetime.now() - timedelta(days=1)).timestamp()) async with self.connection() as conn: cursor = await conn.execute( """ SELECT SUM(amount) as total FROM transactions WHERE (from_address = ? OR to_address = ?) AND contract = ? AND timestamp >= ? """, (address, address, contract, yesterday) ) row = await cursor.fetchone() return row[0] if row and row[0] else 0 async def monitor_with_analytics(): db = ExtendedSQLiteDB("analytics.db") monitor = TRC20Monitor( database=db, notifier=ConsoleNotifier() ) address = "TYourAddress" contract = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t" # Start monitoring in background monitor_task = asyncio.create_task( monitor.monitor_address(address, contract, duration=3600) ) # Periodically check analytics while not monitor_task.done(): volume = await db.get_daily_volume(address, contract) print(f"24h volume: {volume} USDT") await asyncio.sleep(300) # Check every 5 minutes await monitor_task asyncio.run(monitor_with_analytics()) ``` ### Telegram Bot Integration Send notifications to Telegram: ```python import asyncio import aiohttp from trc20_monitor import NotificationAdapter, Transaction, TRC20Monitor, MemoryDB class TelegramNotifier(NotificationAdapter): def __init__(self, bot_token: str, chat_id: str): self.bot_token = bot_token self.chat_id = chat_id self.api_url = f"https://api.telegram.org/bot{bot_token}" async def notify(self, transaction: Transaction) -> None: # Format message direction = "📥 Received" if transaction.direction == "in" else "📤 Sent" message = f""" {direction} *{transaction.amount} USDT* From: `{transaction.from_address[:10]}...` To: `{transaction.to_address[:10]}...` TX: `{transaction.tx_hash[:10]}...` Block: {transaction.block_number} """ # Send to Telegram async with aiohttp.ClientSession() as session: await session.post( f"{self.api_url}/sendMessage", json={ "chat_id": self.chat_id, "text": message, "parse_mode": "Markdown" } ) async def main(): telegram = TelegramNotifier( bot_token="YOUR_BOT_TOKEN", chat_id="YOUR_CHAT_ID" ) monitor = TRC20Monitor( database=MemoryDB(), notifier=telegram ) await monitor.monitor_address( address="TYourAddress", contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t", min_amount=10.0 ) asyncio.run(main()) ``` ### Rate-Limited Monitor Monitor with rate limiting to avoid API restrictions: ```python import asyncio from datetime import datetime from collections import deque from trc20_monitor import TRC20Monitor, MemoryDB, ConsoleNotifier class RateLimitedMonitor(TRC20Monitor): def __init__(self, *args, max_requests_per_minute=30, **kwargs): super().__init__(*args, **kwargs) self.max_requests = max_requests_per_minute self.request_times = deque(maxlen=max_requests_per_minute) async def _rate_limit(self): now = datetime.now() if len(self.request_times) >= self.max_requests: oldest = self.request_times[0] time_passed = (now - oldest).total_seconds() if time_passed < 60: sleep_time = 60 - time_passed print(f"Rate limit reached, sleeping for {sleep_time:.1f}s") await asyncio.sleep(sleep_time) self.request_times.append(now) async def main(): monitor = RateLimitedMonitor( database=MemoryDB(), notifier=ConsoleNotifier(), max_requests_per_minute=20 ) # Monitor multiple addresses addresses = ["TAddr1", "TAddr2", "TAddr3"] tasks = [] for address in addresses: task = monitor.monitor_address( address=address, contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t", interval=5 # Aggressive checking ) tasks.append(task) await asyncio.gather(*tasks) asyncio.run(main()) ``` ## Production Examples ### Docker Deployment ```python # monitor.py import os import asyncio from trc20_monitor import TRC20Monitor, SQLiteDB, WebhookNotifier async def main(): # Configuration from environment variables address = os.environ["MONITOR_ADDRESS"] contract = os.environ.get("CONTRACT_ADDRESS", "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t") webhook_url = os.environ["WEBHOOK_URL"] min_amount = float(os.environ.get("MIN_AMOUNT", "0")) interval = int(os.environ.get("CHECK_INTERVAL", "30")) # Setup components database = SQLiteDB("/data/transactions.db") notifier = WebhookNotifier( url=webhook_url, headers={"Authorization": f"Bearer {os.environ.get('API_KEY', '')}"} ) monitor = TRC20Monitor(database=database, notifier=notifier) print(f"Starting monitor for {address}") print(f"Contract: {contract}") print(f"Min amount: {min_amount}") print(f"Interval: {interval}s") await monitor.monitor_address( address=address, contract=contract, direction="both", min_amount=min_amount, interval=interval ) if __name__ == "__main__": asyncio.run(main()) ``` ```dockerfile # Dockerfile FROM python:3.11-slim WORKDIR /app RUN pip install trc20-monitor COPY monitor.py . VOLUME ["/data"] CMD ["python", "monitor.py"] ``` ```yaml # docker-compose.yml version: '3.8' services: trc20-monitor: build: . environment: - MONITOR_ADDRESS=TYourAddress - CONTRACT_ADDRESS=TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t - WEBHOOK_URL=https://your-webhook.com/endpoint - API_KEY=your-api-key - MIN_AMOUNT=100 - CHECK_INTERVAL=30 volumes: - ./data:/data restart: unless-stopped ```