Content Moderation and Spam Prevention
This tutorial teaches you how to build a content moderation system for your federated application. You'll learn to block malicious domains, filter spam content, implement rate limiting, and create a moderation queue.
By the end of this tutorial, you'll have a working moderation system that protects your users from spam and abuse while maintaining a healthy federated community.
What You'll Build
- Domain blocking system with admin interface
- Automatic spam detection based on content patterns
- Rate limiting to prevent abuse
- Moderation queue for suspicious activities
- User-level blocking
Prerequisites
- Completed the Getting Started tutorial
- Understanding of Reference and Context Architecture
- Familiarity with Django signals and models
Understanding the Moderation Architecture
The toolkit provides three levels where you can enforce moderation policies:
- Inbox view - Blocks domains before creating notifications (returns 403)
- Document processors - Filter activities before they're parsed (can raise
DropMessage) - Signal handlers - React to activities after processing (for automatic blocking)
In signal handlers, activity.actor and activity.object are Reference objects, not full context models. To access actual data, use .get_by_context().
Set Up Domain Blocking
Start by adding domain blocking to your Django admin.
Create yourapp/admin.py:
from django.contrib import admin
from activitypub.core.models import Domain
@admin.register(Domain)
class DomainAdmin(admin.ModelAdmin):
list_display = ('name', 'local', 'blocked', 'is_active')
list_filter = ('local', 'blocked', 'is_active')
search_fields = ('name',)
actions = ['block_domains', 'unblock_domains']
def block_domains(self, request, queryset):
count = queryset.update(blocked=True)
self.message_user(request, f"Blocked {count} domains")
def unblock_domains(self, request, queryset):
count = queryset.update(blocked=False)
self.message_user(request, f"Unblocked {count} domains")
Test it by running your development server and navigating to the admin interface. Block a test domain and verify that activities from that domain are rejected with 403 Forbidden.
The toolkit automatically checks actor_reference.domain.blocked in the inbox view before processing any activity.
Create Automatic Spam Detection
Now you'll build a system that automatically blocks domains sending spam.
Create yourapp/moderation.py:
import logging
from datetime import timedelta
from django.dispatch import receiver
from django.utils import timezone
from activitypub.core.models import Activity, ObjectContext
from activitypub.core.signals import activity_done
logger = logging.getLogger(__name__)
@receiver(activity_done)
def check_for_spam(sender, activity, **kwargs):
# activity.actor is a Reference (ForeignKey), not an ActorContext
# See: ../topics/reference_context_architecture.md
if not activity.actor:
return
actor_domain = activity.actor.domain
if actor_domain and actor_domain.blocked:
return
if is_spam_activity(activity):
actor_domain.blocked = True
actor_domain.save()
logger.warning(f"Blocked domain {actor_domain.name} for spam")
def is_spam_activity(activity):
# Check for excessive posting from the same actor (Reference)
recent_activities = Activity.objects.filter(
actor=activity.actor,
published__gte=timezone.now() - timedelta(hours=1)
).count()
if recent_activities > 100:
return True
# activity.object is also a Reference, need to get the ObjectContext
# See: ../topics/reference_context_architecture.md for Reference/Context patterns
if activity.object:
obj = activity.object.get_by_context(ObjectContext)
if obj and obj.content:
spam_keywords = ['buy now', 'click here', 'limited offer']
content_lower = obj.content.lower()
if any(keyword in content_lower for keyword in spam_keywords):
return True
return False
Register the handlers in yourapp/apps.py:
from django.apps import AppConfig
class YourAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'yourapp'
def ready(self):
import yourapp.moderation
Test by creating a test activity with spam keywords. The domain should be automatically blocked.
Add Content Filtering with Document Processors
Signal handlers run after activities are processed. To reject activities before they're even loaded, create a document processor.
Create yourapp/processors.py:
import logging
import rdflib
from activitypub.core.contexts import AS2
from activitypub.core.exceptions import DropMessage
from activitypub.core.models import LinkedDataDocument
from activitypub.core.processors import DocumentProcessor
logger = logging.getLogger(__name__)
class SpamFilterProcessor(DocumentProcessor):
def process_incoming(self, document):
if not document:
return
try:
g = LinkedDataDocument.get_graph(document)
subject_uri = rdflib.URIRef(document["id"])
obj_uri = g.value(subject=subject_uri, predicate=AS2.object)
if obj_uri:
content = g.value(subject=obj_uri, predicate=AS2.content)
if content:
content_lower = str(content).lower()
spam_keywords = ['buy now', 'click here', 'limited offer']
if any(keyword in content_lower for keyword in spam_keywords):
logger.info(f"Dropping spam activity {document['id']}")
raise DropMessage("Spam content detected")
except (KeyError, AssertionError):
pass
Register it in settings.py:
FEDERATION = {
# ... other settings ...
'DOCUMENT_PROCESSORS': [
'activitypub.core.processors.ActorDeletionDocumentProcessor',
'activitypub.core.processors.CompactJsonLdDocumentProcessor',
'yourapp.processors.SpamFilterProcessor',
],
}
Document processors run before the document is parsed into context models. Raising DropMessage prevents any further processing and returns a "dropped" status.
Implement Rate Limiting
Prevent abuse by limiting how many activities a domain can send per hour.
Add to yourapp/processors.py:
from django.core.cache import cache
from activitypub.core.models import Reference
def check_rate_limit(domain_name, max_requests=100, window=3600):
cache_key = f"domain_requests_{domain_name}"
request_count = cache.get(cache_key, 0)
if request_count >= max_requests:
return False
cache.set(cache_key, request_count + 1, window)
return True
class RateLimitProcessor(DocumentProcessor):
def process_incoming(self, document):
if not document:
return
try:
actor_uri = document.get('actor')
if not actor_uri:
return
actor_ref = Reference.make(actor_uri)
if not actor_ref.domain:
return
domain_name = actor_ref.domain.name
if not check_rate_limit(domain_name):
logger.warning(f"Rate limit exceeded for domain {domain_name}")
actor_ref.domain.blocked = True
actor_ref.domain.save()
raise DropMessage("Rate limit exceeded")
except (KeyError, AttributeError):
pass
Add it to your settings:
FEDERATION = {
'DOCUMENT_PROCESSORS': [
'activitypub.core.processors.ActorDeletionDocumentProcessor',
'activitypub.core.processors.CompactJsonLdDocumentProcessor',
'yourapp.processors.SpamFilterProcessor',
'yourapp.processors.RateLimitProcessor',
],
}
Test by sending multiple activities from the same domain rapidly. After 100 requests in an hour, the domain should be blocked.
Create a Moderation Queue
Build a queue for suspicious activities that require manual review.
Create yourapp/models.py:
from django.contrib.auth.models import User
from django.db import models
class ModerationQueue(models.Model):
activity_uri = models.CharField(max_length=2083, unique=True)
reason = models.CharField(max_length=200)
moderator = models.ForeignKey(
User, null=True, blank=True, on_delete=models.SET_NULL
)
approved = models.BooleanField(null=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-created_at']
def __str__(self):
return f"{self.activity_uri} - {self.reason}"
Add the processor to yourapp/processors.py:
from datetime import timedelta
from django.utils import timezone
from activitypub.core.models import ActorContext
from yourapp.models import ModerationQueue
class ModerationQueueProcessor(DocumentProcessor):
def process_incoming(self, document):
if not document:
return
try:
g = LinkedDataDocument.get_graph(document)
subject_uri = rdflib.URIRef(document["id"])
actor_uri = g.value(subject=subject_uri, predicate=AS2.actor)
if not actor_uri:
return
actor_ref = Reference.make(str(actor_uri))
if actor_ref.is_remote and not actor_ref.is_resolved:
ModerationQueue.objects.create(
activity_uri=document["id"],
reason="New unresolved remote actor"
)
raise DropMessage("Queued for moderation")
actor = actor_ref.get_by_context(ActorContext)
if actor and actor.published:
account_age = timezone.now() - actor.published
if account_age < timedelta(days=1):
ModerationQueue.objects.create(
activity_uri=document["id"],
reason="Actor account less than 1 day old"
)
raise DropMessage("Queued for moderation")
except (KeyError, AttributeError):
pass
Create an admin interface in yourapp/admin.py:
from yourapp.models import ModerationQueue
@admin.register(ModerationQueue)
class ModerationQueueAdmin(admin.ModelAdmin):
list_display = ('activity_uri', 'reason', 'approved', 'moderator', 'created_at')
list_filter = ('approved', 'created_at')
search_fields = ('activity_uri', 'reason')
actions = ['approve_activities', 'reject_activities']
def approve_activities(self, request, queryset):
count = queryset.update(approved=True, moderator=request.user)
self.message_user(request, f"Approved {count} activities")
def reject_activities(self, request, queryset):
count = queryset.update(approved=False, moderator=request.user)
self.message_user(request, f"Rejected {count} activities")
Run migrations:
python manage.py makemigrations
python manage.py migrate
Test by sending an activity from a newly created actor. It should appear in the moderation queue.
Add User-Level Blocking
Allow users to block specific actors.
Add to yourapp/models.py:
from activitypub.core.models import Reference
class UserBlock(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
blocked_actor = models.ForeignKey(Reference, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
unique_together = ('user', 'blocked_actor')
Add to yourapp/moderation.py:
from yourapp.models import UserBlock
@receiver(activity_done)
def filter_blocked_actors(sender, activity, **kwargs):
if not activity.actor:
return
blocked_by_users = UserBlock.objects.filter(
blocked_actor=activity.actor
).exists()
if blocked_by_users:
logger.info(f"Activity from {activity.actor.uri} blocked by user preference")
Run migrations and test by creating a user block and verifying that activities from that actor are logged.
Add Monitoring and Alerts
Send email alerts when domains are automatically blocked.
Add to yourapp/moderation.py:
from django.conf import settings
from django.core.mail import send_mail
def send_block_alert(domain):
send_mail(
subject=f"Domain {domain.name} has been blocked",
message=f"Domain {domain.name} was automatically blocked.",
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=settings.MODERATOR_EMAILS,
)
@receiver(activity_done)
def alert_on_auto_block(sender, activity, **kwargs):
if not activity.actor or not activity.actor.domain:
return
domain = activity.actor.domain
if domain.blocked and is_spam_activity(activity):
send_block_alert(domain)
Add to settings.py:
MODERATOR_EMAILS = ['moderators@example.com']
Test by triggering an automatic block and verifying that an email is sent.
What You've Learned
You now understand:
- How domain blocking works at the inbox level
- The difference between document processors and signal handlers
- When to use
DropMessagevs signal-based reactions - How to work with References vs Context models
- Building a complete moderation system with multiple layers
Best Practices
- Start permissive - Block only when necessary
- Monitor patterns - Look for abuse trends before blocking
- Document reasons - Keep records of why domains were blocked
- Regular review - Periodically review and unblock legitimate domains
- Graduated response - Use warnings and rate limits before permanent blocks
Next Steps
- Implement a public blocklist API for sharing blocked domains
- Add appeal process for blocked users
- Create analytics dashboard for moderation metrics
- Integrate with external spam detection services