Examples

Basic Examples

Simple USDT Monitor

Monitor incoming USDT transactions with console output:

import asyncio
from trc20_monitor import TRC20Monitor, MemoryDB, ConsoleNotifier

async def main():
    monitor = TRC20Monitor(
        database=MemoryDB(),
        notifier=ConsoleNotifier()
    )
    
    # USDT contract on Tron
    USDT_CONTRACT = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
    
    await monitor.monitor_address(
        address="TYourTronAddress",
        contract=USDT_CONTRACT,
        direction="in"
    )

if __name__ == "__main__":
    asyncio.run(main())

Exchange Deposit Monitor

Monitor deposits to an exchange address with minimum amount filter:

import asyncio
from trc20_monitor import TRC20Monitor, SQLiteDB, WebhookNotifier

async def monitor_exchange_deposits():
    # Use SQLite for persistent storage
    database = SQLiteDB("exchange_deposits.db")
    
    # Send notifications to webhook
    notifier = WebhookNotifier(
        url="https://api.exchange.com/deposit-webhook",
        headers={"X-API-Key": "your-api-key"}
    )
    
    monitor = TRC20Monitor(database=database, notifier=notifier)
    
    # Monitor large USDT deposits (>= 1000 USDT)
    await monitor.monitor_address(
        address="TExchangeDepositAddress",
        contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
        direction="in",
        min_amount=1000.0,
        interval=10  # Check every 10 seconds
    )

asyncio.run(monitor_exchange_deposits())

Advanced Examples

Multi-Token Monitor

Monitor multiple TRC20 tokens simultaneously:

import asyncio
from trc20_monitor import TRC20Monitor, MemoryDB, CompositeNotifier
from trc20_monitor import ConsoleNotifier, FileNotifier

async def monitor_token(address: str, contract: str, token_name: str):
    notifier = CompositeNotifier([
        ConsoleNotifier(),
        FileNotifier(f"{token_name}_transactions.log")
    ])
    
    monitor = TRC20Monitor(
        database=MemoryDB(),
        notifier=notifier
    )
    
    print(f"Starting {token_name} monitor...")
    await monitor.monitor_address(
        address=address,
        contract=contract,
        direction="both"
    )

async def main():
    address = "TYourAddress"
    
    # Token contracts
    tokens = {
        "USDT": "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
        "USDC": "TEkxiTehnzSmSe2XqrBj4w32RUN966rdz8",
        "TUSD": "TUpMhErZL2fhh4sVNULAbNKLokS4GjC1F4"
    }
    
    # Create monitoring tasks
    tasks = [
        monitor_token(address, contract, name)
        for name, contract in tokens.items()
    ]
    
    # Run all monitors concurrently
    await asyncio.gather(*tasks)

asyncio.run(main())

Custom Email Notifier

Create a custom notifier that sends email alerts:

import asyncio
import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from trc20_monitor import NotificationAdapter, Transaction, TRC20Monitor, MemoryDB

class EmailNotifier(NotificationAdapter):
    def __init__(self, smtp_host: str, smtp_port: int, 
                 username: str, password: str, to_emails: list):
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.to_emails = to_emails
    
    async def notify(self, transaction: Transaction) -> None:
        # Create email message
        message = MIMEMultipart("alternative")
        message["Subject"] = f"TRC20 Alert: {transaction.amount} received"
        message["From"] = self.username
        message["To"] = ", ".join(self.to_emails)
        
        # Email body
        html = f"""
        <html>
            <body>
                <h2>New TRC20 Transaction</h2>
                <table border="1">
                    <tr><td>Amount</td><td>{transaction.amount}</td></tr>
                    <tr><td>From</td><td>{transaction.from_address}</td></tr>
                    <tr><td>To</td><td>{transaction.to_address}</td></tr>
                    <tr><td>Hash</td><td>{transaction.tx_hash}</td></tr>
                    <tr><td>Block</td><td>{transaction.block_number}</td></tr>
                </table>
            </body>
        </html>
        """
        
        part = MIMEText(html, "html")
        message.attach(part)
        
        # Send email
        await aiosmtplib.send(
            message,
            hostname=self.smtp_host,
            port=self.smtp_port,
            username=self.username,
            password=self.password,
            use_tls=True
        )

async def main():
    email_notifier = EmailNotifier(
        smtp_host="smtp.gmail.com",
        smtp_port=587,
        username="your-email@gmail.com",
        password="your-app-password",
        to_emails=["admin@example.com", "alerts@example.com"]
    )
    
    monitor = TRC20Monitor(
        database=MemoryDB(),
        notifier=email_notifier
    )
    
    await monitor.monitor_address(
        address="TYourAddress",
        contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
        min_amount=100.0  # Only alert for transactions >= 100 USDT
    )

asyncio.run(main())

Database with Transaction History

Store and query transaction history:

import asyncio
from datetime import datetime, timedelta
from trc20_monitor import TRC20Monitor, SQLiteDB, ConsoleNotifier

class ExtendedSQLiteDB(SQLiteDB):
    async def get_transactions_since(self, timestamp: int):
        async with self.connection() as conn:
            cursor = await conn.execute(
                """
                SELECT * FROM transactions 
                WHERE timestamp >= ? 
                ORDER BY timestamp DESC
                """,
                (timestamp,)
            )
            rows = await cursor.fetchall()
            return [self._row_to_transaction(row) for row in rows]
    
    async def get_daily_volume(self, address: str, contract: str):
        yesterday = int((datetime.now() - timedelta(days=1)).timestamp())
        async with self.connection() as conn:
            cursor = await conn.execute(
                """
                SELECT SUM(amount) as total 
                FROM transactions 
                WHERE (from_address = ? OR to_address = ?)
                AND contract = ? 
                AND timestamp >= ?
                """,
                (address, address, contract, yesterday)
            )
            row = await cursor.fetchone()
            return row[0] if row and row[0] else 0

async def monitor_with_analytics():
    db = ExtendedSQLiteDB("analytics.db")
    monitor = TRC20Monitor(
        database=db,
        notifier=ConsoleNotifier()
    )
    
    address = "TYourAddress"
    contract = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
    
    # Start monitoring in background
    monitor_task = asyncio.create_task(
        monitor.monitor_address(address, contract, duration=3600)
    )
    
    # Periodically check analytics
    while not monitor_task.done():
        volume = await db.get_daily_volume(address, contract)
        print(f"24h volume: {volume} USDT")
        await asyncio.sleep(300)  # Check every 5 minutes
    
    await monitor_task

asyncio.run(monitor_with_analytics())

Telegram Bot Integration

Send notifications to Telegram:

import asyncio
import aiohttp
from trc20_monitor import NotificationAdapter, Transaction, TRC20Monitor, MemoryDB

class TelegramNotifier(NotificationAdapter):
    def __init__(self, bot_token: str, chat_id: str):
        self.bot_token = bot_token
        self.chat_id = chat_id
        self.api_url = f"https://api.telegram.org/bot{bot_token}"
    
    async def notify(self, transaction: Transaction) -> None:
        # Format message
        direction = "📥 Received" if transaction.direction == "in" else "📤 Sent"
        message = f"""
{direction} *{transaction.amount} USDT*

From: `{transaction.from_address[:10]}...`
To: `{transaction.to_address[:10]}...`
TX: `{transaction.tx_hash[:10]}...`
Block: {transaction.block_number}
        """
        
        # Send to Telegram
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.api_url}/sendMessage",
                json={
                    "chat_id": self.chat_id,
                    "text": message,
                    "parse_mode": "Markdown"
                }
            )

async def main():
    telegram = TelegramNotifier(
        bot_token="YOUR_BOT_TOKEN",
        chat_id="YOUR_CHAT_ID"
    )
    
    monitor = TRC20Monitor(
        database=MemoryDB(),
        notifier=telegram
    )
    
    await monitor.monitor_address(
        address="TYourAddress",
        contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
        min_amount=10.0
    )

asyncio.run(main())

Rate-Limited Monitor

Monitor with rate limiting to avoid API restrictions:

import asyncio
from datetime import datetime
from collections import deque
from trc20_monitor import TRC20Monitor, MemoryDB, ConsoleNotifier

class RateLimitedMonitor(TRC20Monitor):
    def __init__(self, *args, max_requests_per_minute=30, **kwargs):
        super().__init__(*args, **kwargs)
        self.max_requests = max_requests_per_minute
        self.request_times = deque(maxlen=max_requests_per_minute)
    
    async def _rate_limit(self):
        now = datetime.now()
        if len(self.request_times) >= self.max_requests:
            oldest = self.request_times[0]
            time_passed = (now - oldest).total_seconds()
            if time_passed < 60:
                sleep_time = 60 - time_passed
                print(f"Rate limit reached, sleeping for {sleep_time:.1f}s")
                await asyncio.sleep(sleep_time)
        self.request_times.append(now)

async def main():
    monitor = RateLimitedMonitor(
        database=MemoryDB(),
        notifier=ConsoleNotifier(),
        max_requests_per_minute=20
    )
    
    # Monitor multiple addresses
    addresses = ["TAddr1", "TAddr2", "TAddr3"]
    
    tasks = []
    for address in addresses:
        task = monitor.monitor_address(
            address=address,
            contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
            interval=5  # Aggressive checking
        )
        tasks.append(task)
    
    await asyncio.gather(*tasks)

asyncio.run(main())

Production Examples

Docker Deployment

# monitor.py
import os
import asyncio
from trc20_monitor import TRC20Monitor, SQLiteDB, WebhookNotifier

async def main():
    # Configuration from environment variables
    address = os.environ["MONITOR_ADDRESS"]
    contract = os.environ.get("CONTRACT_ADDRESS", "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t")
    webhook_url = os.environ["WEBHOOK_URL"]
    min_amount = float(os.environ.get("MIN_AMOUNT", "0"))
    interval = int(os.environ.get("CHECK_INTERVAL", "30"))
    
    # Setup components
    database = SQLiteDB("/data/transactions.db")
    notifier = WebhookNotifier(
        url=webhook_url,
        headers={"Authorization": f"Bearer {os.environ.get('API_KEY', '')}"}
    )
    
    monitor = TRC20Monitor(database=database, notifier=notifier)
    
    print(f"Starting monitor for {address}")
    print(f"Contract: {contract}")
    print(f"Min amount: {min_amount}")
    print(f"Interval: {interval}s")
    
    await monitor.monitor_address(
        address=address,
        contract=contract,
        direction="both",
        min_amount=min_amount,
        interval=interval
    )

if __name__ == "__main__":
    asyncio.run(main())
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

RUN pip install trc20-monitor

COPY monitor.py .

VOLUME ["/data"]

CMD ["python", "monitor.py"]
# docker-compose.yml
version: '3.8'

services:
  trc20-monitor:
    build: .
    environment:
      - MONITOR_ADDRESS=TYourAddress
      - CONTRACT_ADDRESS=TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t
      - WEBHOOK_URL=https://your-webhook.com/endpoint
      - API_KEY=your-api-key
      - MIN_AMOUNT=100
      - CHECK_INTERVAL=30
    volumes:
      - ./data:/data
    restart: unless-stopped