# Examples
## Basic Examples
### Simple USDT Monitor
Monitor incoming USDT transactions with console output:
```python
import asyncio
from trc20_monitor import TRC20Monitor, MemoryDB, ConsoleNotifier
async def main():
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=ConsoleNotifier()
)
# USDT contract on Tron
USDT_CONTRACT = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
await monitor.monitor_address(
address="TYourTronAddress",
contract=USDT_CONTRACT,
direction="in"
)
if __name__ == "__main__":
asyncio.run(main())
```
### Exchange Deposit Monitor
Monitor deposits to an exchange address with minimum amount filter:
```python
import asyncio
from trc20_monitor import TRC20Monitor, SQLiteDB, WebhookNotifier
async def monitor_exchange_deposits():
# Use SQLite for persistent storage
database = SQLiteDB("exchange_deposits.db")
# Send notifications to webhook
notifier = WebhookNotifier(
url="https://api.exchange.com/deposit-webhook",
headers={"X-API-Key": "your-api-key"}
)
monitor = TRC20Monitor(database=database, notifier=notifier)
# Monitor large USDT deposits (>= 1000 USDT)
await monitor.monitor_address(
address="TExchangeDepositAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
direction="in",
min_amount=1000.0,
interval=10 # Check every 10 seconds
)
asyncio.run(monitor_exchange_deposits())
```
## Advanced Examples
### Multi-Token Monitor
Monitor multiple TRC20 tokens simultaneously:
```python
import asyncio
from trc20_monitor import TRC20Monitor, MemoryDB, CompositeNotifier
from trc20_monitor import ConsoleNotifier, FileNotifier
async def monitor_token(address: str, contract: str, token_name: str):
notifier = CompositeNotifier([
ConsoleNotifier(),
FileNotifier(f"{token_name}_transactions.log")
])
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=notifier
)
print(f"Starting {token_name} monitor...")
await monitor.monitor_address(
address=address,
contract=contract,
direction="both"
)
async def main():
address = "TYourAddress"
# Token contracts
tokens = {
"USDT": "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
"USDC": "TEkxiTehnzSmSe2XqrBj4w32RUN966rdz8",
"TUSD": "TUpMhErZL2fhh4sVNULAbNKLokS4GjC1F4"
}
# Create monitoring tasks
tasks = [
monitor_token(address, contract, name)
for name, contract in tokens.items()
]
# Run all monitors concurrently
await asyncio.gather(*tasks)
asyncio.run(main())
```
### Custom Email Notifier
Create a custom notifier that sends email alerts:
```python
import asyncio
import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from trc20_monitor import NotificationAdapter, Transaction, TRC20Monitor, MemoryDB
class EmailNotifier(NotificationAdapter):
def __init__(self, smtp_host: str, smtp_port: int,
username: str, password: str, to_emails: list):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
self.to_emails = to_emails
async def notify(self, transaction: Transaction) -> None:
# Create email message
message = MIMEMultipart("alternative")
message["Subject"] = f"TRC20 Alert: {transaction.amount} received"
message["From"] = self.username
message["To"] = ", ".join(self.to_emails)
# Email body
html = f"""
New TRC20 Transaction
| Amount | {transaction.amount} |
| From | {transaction.from_address} |
| To | {transaction.to_address} |
| Hash | {transaction.tx_hash} |
| Block | {transaction.block_number} |
"""
part = MIMEText(html, "html")
message.attach(part)
# Send email
await aiosmtplib.send(
message,
hostname=self.smtp_host,
port=self.smtp_port,
username=self.username,
password=self.password,
use_tls=True
)
async def main():
email_notifier = EmailNotifier(
smtp_host="smtp.gmail.com",
smtp_port=587,
username="your-email@gmail.com",
password="your-app-password",
to_emails=["admin@example.com", "alerts@example.com"]
)
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=email_notifier
)
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
min_amount=100.0 # Only alert for transactions >= 100 USDT
)
asyncio.run(main())
```
### Database with Transaction History
Store and query transaction history:
```python
import asyncio
from datetime import datetime, timedelta
from trc20_monitor import TRC20Monitor, SQLiteDB, ConsoleNotifier
class ExtendedSQLiteDB(SQLiteDB):
async def get_transactions_since(self, timestamp: int):
async with self.connection() as conn:
cursor = await conn.execute(
"""
SELECT * FROM transactions
WHERE timestamp >= ?
ORDER BY timestamp DESC
""",
(timestamp,)
)
rows = await cursor.fetchall()
return [self._row_to_transaction(row) for row in rows]
async def get_daily_volume(self, address: str, contract: str):
yesterday = int((datetime.now() - timedelta(days=1)).timestamp())
async with self.connection() as conn:
cursor = await conn.execute(
"""
SELECT SUM(amount) as total
FROM transactions
WHERE (from_address = ? OR to_address = ?)
AND contract = ?
AND timestamp >= ?
""",
(address, address, contract, yesterday)
)
row = await cursor.fetchone()
return row[0] if row and row[0] else 0
async def monitor_with_analytics():
db = ExtendedSQLiteDB("analytics.db")
monitor = TRC20Monitor(
database=db,
notifier=ConsoleNotifier()
)
address = "TYourAddress"
contract = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
# Start monitoring in background
monitor_task = asyncio.create_task(
monitor.monitor_address(address, contract, duration=3600)
)
# Periodically check analytics
while not monitor_task.done():
volume = await db.get_daily_volume(address, contract)
print(f"24h volume: {volume} USDT")
await asyncio.sleep(300) # Check every 5 minutes
await monitor_task
asyncio.run(monitor_with_analytics())
```
### Telegram Bot Integration
Send notifications to Telegram:
```python
import asyncio
import aiohttp
from trc20_monitor import NotificationAdapter, Transaction, TRC20Monitor, MemoryDB
class TelegramNotifier(NotificationAdapter):
def __init__(self, bot_token: str, chat_id: str):
self.bot_token = bot_token
self.chat_id = chat_id
self.api_url = f"https://api.telegram.org/bot{bot_token}"
async def notify(self, transaction: Transaction) -> None:
# Format message
direction = "📥 Received" if transaction.direction == "in" else "📤 Sent"
message = f"""
{direction} *{transaction.amount} USDT*
From: `{transaction.from_address[:10]}...`
To: `{transaction.to_address[:10]}...`
TX: `{transaction.tx_hash[:10]}...`
Block: {transaction.block_number}
"""
# Send to Telegram
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.api_url}/sendMessage",
json={
"chat_id": self.chat_id,
"text": message,
"parse_mode": "Markdown"
}
)
async def main():
telegram = TelegramNotifier(
bot_token="YOUR_BOT_TOKEN",
chat_id="YOUR_CHAT_ID"
)
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=telegram
)
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
min_amount=10.0
)
asyncio.run(main())
```
### Rate-Limited Monitor
Monitor with rate limiting to avoid API restrictions:
```python
import asyncio
from datetime import datetime
from collections import deque
from trc20_monitor import TRC20Monitor, MemoryDB, ConsoleNotifier
class RateLimitedMonitor(TRC20Monitor):
def __init__(self, *args, max_requests_per_minute=30, **kwargs):
super().__init__(*args, **kwargs)
self.max_requests = max_requests_per_minute
self.request_times = deque(maxlen=max_requests_per_minute)
async def _rate_limit(self):
now = datetime.now()
if len(self.request_times) >= self.max_requests:
oldest = self.request_times[0]
time_passed = (now - oldest).total_seconds()
if time_passed < 60:
sleep_time = 60 - time_passed
print(f"Rate limit reached, sleeping for {sleep_time:.1f}s")
await asyncio.sleep(sleep_time)
self.request_times.append(now)
async def main():
monitor = RateLimitedMonitor(
database=MemoryDB(),
notifier=ConsoleNotifier(),
max_requests_per_minute=20
)
# Monitor multiple addresses
addresses = ["TAddr1", "TAddr2", "TAddr3"]
tasks = []
for address in addresses:
task = monitor.monitor_address(
address=address,
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
interval=5 # Aggressive checking
)
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(main())
```
## Production Examples
### Docker Deployment
```python
# monitor.py
import os
import asyncio
from trc20_monitor import TRC20Monitor, SQLiteDB, WebhookNotifier
async def main():
# Configuration from environment variables
address = os.environ["MONITOR_ADDRESS"]
contract = os.environ.get("CONTRACT_ADDRESS", "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t")
webhook_url = os.environ["WEBHOOK_URL"]
min_amount = float(os.environ.get("MIN_AMOUNT", "0"))
interval = int(os.environ.get("CHECK_INTERVAL", "30"))
# Setup components
database = SQLiteDB("/data/transactions.db")
notifier = WebhookNotifier(
url=webhook_url,
headers={"Authorization": f"Bearer {os.environ.get('API_KEY', '')}"}
)
monitor = TRC20Monitor(database=database, notifier=notifier)
print(f"Starting monitor for {address}")
print(f"Contract: {contract}")
print(f"Min amount: {min_amount}")
print(f"Interval: {interval}s")
await monitor.monitor_address(
address=address,
contract=contract,
direction="both",
min_amount=min_amount,
interval=interval
)
if __name__ == "__main__":
asyncio.run(main())
```
```dockerfile
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN pip install trc20-monitor
COPY monitor.py .
VOLUME ["/data"]
CMD ["python", "monitor.py"]
```
```yaml
# docker-compose.yml
version: '3.8'
services:
trc20-monitor:
build: .
environment:
- MONITOR_ADDRESS=TYourAddress
- CONTRACT_ADDRESS=TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t
- WEBHOOK_URL=https://your-webhook.com/endpoint
- API_KEY=your-api-key
- MIN_AMOUNT=100
- CHECK_INTERVAL=30
volumes:
- ./data:/data
restart: unless-stopped
```