Usage Guide
Basic Usage
Monitoring Incoming Transactions
import asyncio
from trc20_monitor import TRC20Monitor, MemoryDB, ConsoleNotifier
async def monitor_incoming():
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=ConsoleNotifier()
)
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t", # USDT
direction="in",
interval=30
)
asyncio.run(monitor_incoming())
Monitoring Outgoing Transactions
async def monitor_outgoing():
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=ConsoleNotifier()
)
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
direction="out",
interval=30
)
Monitoring Both Directions
async def monitor_both():
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=ConsoleNotifier()
)
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
direction="both",
interval=30
)
Advanced Features
Filtering by Amount
Only process transactions above a certain threshold:
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
direction="in",
min_amount=100.0 # Only transactions >= 100 USDT
)
Custom Check Interval
Adjust how frequently the monitor checks for new transactions:
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
interval=10 # Check every 10 seconds
)
Limited Duration Monitoring
Monitor for a specific duration:
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
duration=3600 # Monitor for 1 hour
)
Storage Options
SQLite Database
For persistent storage:
from trc20_monitor import SQLiteDB
monitor = TRC20Monitor(
database=SQLiteDB("transactions.db"),
notifier=ConsoleNotifier()
)
Custom Database
Implement your own database adapter:
from trc20_monitor import DatabaseAdapter
class CustomDB(DatabaseAdapter):
async def save_transaction(self, transaction):
# Your implementation
pass
async def get_latest_transaction(self, address, contract, direction):
# Your implementation
pass
async def transaction_exists(self, tx_hash):
# Your implementation
pass
Notification Options
File Notifier
Write notifications to a file:
from trc20_monitor import FileNotifier
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=FileNotifier("transactions.log")
)
Webhook Notifier
Send notifications to a webhook:
from trc20_monitor import WebhookNotifier
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=WebhookNotifier(
url="https://your-webhook.com/endpoint",
headers={"Authorization": "Bearer YOUR_TOKEN"}
)
)
Custom Notifier
Create your own notification handler:
from trc20_monitor import NotificationAdapter
class EmailNotifier(NotificationAdapter):
async def notify(self, transaction):
# Send email notification
await send_email(
to="admin@example.com",
subject=f"New transaction: {transaction.amount} USDT",
body=f"Transaction details: {transaction}"
)
Multiple Notifiers
Use multiple notification channels:
from trc20_monitor import CompositeNotifier
notifier = CompositeNotifier([
ConsoleNotifier(),
FileNotifier("transactions.log"),
WebhookNotifier("https://webhook.com/endpoint")
])
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=notifier
)
Error Handling
Retry Logic
The monitor includes built-in retry logic:
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=ConsoleNotifier(),
max_retries=5, # Retry failed requests up to 5 times
retry_delay=2 # Wait 2 seconds between retries
)
Custom Error Handling
try:
await monitor.monitor_address(
address="TYourAddress",
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
)
except MonitorError as e:
print(f"Monitoring error: {e}")
except NetworkError as e:
print(f"Network error: {e}")
Using the CLI
Basic Monitoring
# Monitor incoming USDT transactions
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t
# Monitor outgoing transactions
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t --direction out
# Monitor both directions
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t --direction both
With Custom Storage
# Use SQLite database
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t \\
--db-type sqlite --db-path transactions.db
With Notifications
# File notifications
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t \\
--notifier file --file-path alerts.log
# Webhook notifications
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t \\
--notifier webhook --webhook-url https://webhook.com/endpoint
Advanced Options
# Full example with all options
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t \\
--direction both \\
--interval 10 \\
--min-amount 100 \\
--duration 3600 \\
--db-type sqlite \\
--db-path transactions.db \\
--notifier webhook \\
--webhook-url https://webhook.com/endpoint
Performance Optimization
Batch Processing
Process multiple addresses simultaneously:
async def monitor_multiple():
monitors = []
addresses = ["TAddr1", "TAddr2", "TAddr3"]
for address in addresses:
monitor = TRC20Monitor(
database=MemoryDB(),
notifier=ConsoleNotifier()
)
task = monitor.monitor_address(
address=address,
contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
)
monitors.append(task)
await asyncio.gather(*monitors)
Connection Pooling
For high-volume monitoring:
monitor = TRC20Monitor(
database=SQLiteDB("transactions.db"),
notifier=ConsoleNotifier(),
connection_pool_size=10 # Use connection pooling
)
Best Practices
Use persistent storage for production environments
Implement proper error handling and logging
Set appropriate intervals to avoid rate limiting
Use webhooks for real-time notifications
Monitor system resources when running multiple monitors
Implement data retention policies for large-scale deployments
Use environment variables for sensitive configuration