Usage Guide

Basic Usage

Monitoring Incoming Transactions

import asyncio
from trc20_monitor import TRC20Monitor, MemoryDB, ConsoleNotifier

async def monitor_incoming():
    monitor = TRC20Monitor(
        database=MemoryDB(),
        notifier=ConsoleNotifier()
    )
    
    await monitor.monitor_address(
        address="TYourAddress",
        contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",  # USDT
        direction="in",
        interval=30
    )

asyncio.run(monitor_incoming())

Monitoring Outgoing Transactions

async def monitor_outgoing():
    monitor = TRC20Monitor(
        database=MemoryDB(),
        notifier=ConsoleNotifier()
    )
    
    await monitor.monitor_address(
        address="TYourAddress",
        contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
        direction="out",
        interval=30
    )

Monitoring Both Directions

async def monitor_both():
    monitor = TRC20Monitor(
        database=MemoryDB(),
        notifier=ConsoleNotifier()
    )
    
    await monitor.monitor_address(
        address="TYourAddress",
        contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
        direction="both",
        interval=30
    )

Advanced Features

Filtering by Amount

Only process transactions above a certain threshold:

await monitor.monitor_address(
    address="TYourAddress",
    contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
    direction="in",
    min_amount=100.0  # Only transactions >= 100 USDT
)

Custom Check Interval

Adjust how frequently the monitor checks for new transactions:

await monitor.monitor_address(
    address="TYourAddress",
    contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
    interval=10  # Check every 10 seconds
)

Limited Duration Monitoring

Monitor for a specific duration:

await monitor.monitor_address(
    address="TYourAddress",
    contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t",
    duration=3600  # Monitor for 1 hour
)

Storage Options

SQLite Database

For persistent storage:

from trc20_monitor import SQLiteDB

monitor = TRC20Monitor(
    database=SQLiteDB("transactions.db"),
    notifier=ConsoleNotifier()
)

Custom Database

Implement your own database adapter:

from trc20_monitor import DatabaseAdapter

class CustomDB(DatabaseAdapter):
    async def save_transaction(self, transaction):
        # Your implementation
        pass
    
    async def get_latest_transaction(self, address, contract, direction):
        # Your implementation
        pass
    
    async def transaction_exists(self, tx_hash):
        # Your implementation
        pass

Notification Options

File Notifier

Write notifications to a file:

from trc20_monitor import FileNotifier

monitor = TRC20Monitor(
    database=MemoryDB(),
    notifier=FileNotifier("transactions.log")
)

Webhook Notifier

Send notifications to a webhook:

from trc20_monitor import WebhookNotifier

monitor = TRC20Monitor(
    database=MemoryDB(),
    notifier=WebhookNotifier(
        url="https://your-webhook.com/endpoint",
        headers={"Authorization": "Bearer YOUR_TOKEN"}
    )
)

Custom Notifier

Create your own notification handler:

from trc20_monitor import NotificationAdapter

class EmailNotifier(NotificationAdapter):
    async def notify(self, transaction):
        # Send email notification
        await send_email(
            to="admin@example.com",
            subject=f"New transaction: {transaction.amount} USDT",
            body=f"Transaction details: {transaction}"
        )

Multiple Notifiers

Use multiple notification channels:

from trc20_monitor import CompositeNotifier

notifier = CompositeNotifier([
    ConsoleNotifier(),
    FileNotifier("transactions.log"),
    WebhookNotifier("https://webhook.com/endpoint")
])

monitor = TRC20Monitor(
    database=MemoryDB(),
    notifier=notifier
)

Error Handling

Retry Logic

The monitor includes built-in retry logic:

monitor = TRC20Monitor(
    database=MemoryDB(),
    notifier=ConsoleNotifier(),
    max_retries=5,  # Retry failed requests up to 5 times
    retry_delay=2    # Wait 2 seconds between retries
)

Custom Error Handling

try:
    await monitor.monitor_address(
        address="TYourAddress",
        contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
    )
except MonitorError as e:
    print(f"Monitoring error: {e}")
except NetworkError as e:
    print(f"Network error: {e}")

Using the CLI

Basic Monitoring

# Monitor incoming USDT transactions
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t

# Monitor outgoing transactions
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t --direction out

# Monitor both directions
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t --direction both

With Custom Storage

# Use SQLite database
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t \\
    --db-type sqlite --db-path transactions.db

With Notifications

# File notifications
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t \\
    --notifier file --file-path alerts.log

# Webhook notifications
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t \\
    --notifier webhook --webhook-url https://webhook.com/endpoint

Advanced Options

# Full example with all options
trc20-monitor monitor TYourAddress TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t \\
    --direction both \\
    --interval 10 \\
    --min-amount 100 \\
    --duration 3600 \\
    --db-type sqlite \\
    --db-path transactions.db \\
    --notifier webhook \\
    --webhook-url https://webhook.com/endpoint

Performance Optimization

Batch Processing

Process multiple addresses simultaneously:

async def monitor_multiple():
    monitors = []
    addresses = ["TAddr1", "TAddr2", "TAddr3"]
    
    for address in addresses:
        monitor = TRC20Monitor(
            database=MemoryDB(),
            notifier=ConsoleNotifier()
        )
        task = monitor.monitor_address(
            address=address,
            contract="TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"
        )
        monitors.append(task)
    
    await asyncio.gather(*monitors)

Connection Pooling

For high-volume monitoring:

monitor = TRC20Monitor(
    database=SQLiteDB("transactions.db"),
    notifier=ConsoleNotifier(),
    connection_pool_size=10  # Use connection pooling
)

Best Practices

  1. Use persistent storage for production environments

  2. Implement proper error handling and logging

  3. Set appropriate intervals to avoid rate limiting

  4. Use webhooks for real-time notifications

  5. Monitor system resources when running multiple monitors

  6. Implement data retention policies for large-scale deployments

  7. Use environment variables for sensitive configuration