On-Call Management

Alert Deduplication: Preventing Alert Storms and Notification Floods

Learn how alert deduplication works, how to implement grouping and correlation strategies, and how to prevent alert storms from overwhelming your on-call team during incidents.

AzMonitor TeamJuly 9, 20257 min read · 1,531 wordsUpdated January 20, 2026
alert deduplicationalert groupingalert stormsincident management

When a major incident occurs, monitoring systems can generate dozens or hundreds of alerts in seconds. A single database failure might trigger alerts from the database monitor, every service that depends on the database, load balancer health checks, and synthetic monitors testing user flows — all within 60 seconds of the initial failure. Without deduplication, your on-call engineer gets 47 pages about the same incident.

Alert deduplication groups related alerts into a single incident, ensuring engineers receive one notification with full context rather than a flood of separate pages.

Why Alert Storms Happen

Alert storms are a predictable consequence of modern distributed systems:

Cascade failures — One service failure causes dependent services to fail, each generating their own alerts.

Monitoring redundancy — Multiple monitors check the same service (synthetic, RUM, infrastructure). When the service fails, all fire.

Aggressive thresholds — Low thresholds and short evaluation windows create many alerts from transient issues that would have self-resolved.

No grouping logic — Each alert fires independently with no awareness of related alerts.

The impact: engineers arrive at their laptop to 50 unread PagerDuty notifications. They spend the first 10 minutes trying to understand what's happening rather than fixing it.

Deduplication Strategies

1. Time-Window Grouping

Group alerts that fire within the same time window:

class AlertDeduplicator:
    """
    Deduplicate alerts that fire within close temporal proximity.
    """
    
    def __init__(self, dedup_window_seconds=300):
        self.dedup_window = dedup_window_seconds  # 5 minutes default
        self.active_incidents = {}
    
    def process_alert(self, alert):
        """
        Process an incoming alert, deduplicating against active incidents.
        Returns the incident the alert belongs to.
        """
        # Check if this alert matches an active incident
        matching_incident = self.find_matching_incident(alert)
        
        if matching_incident:
            # Add alert to existing incident
            matching_incident.add_alert(alert)
            return matching_incident, False  # False = not a new incident
        else:
            # Create new incident for this alert
            new_incident = self.create_incident(alert)
            self.active_incidents[new_incident.id] = new_incident
            return new_incident, True  # True = new incident
    
    def find_matching_incident(self, alert):
        """
        Find an active incident that this alert should be grouped with.
        Matching criteria: same service OR related services within time window.
        """
        now = datetime.utcnow()
        
        for incident in self.active_incidents.values():
            if incident.status == "resolved":
                continue
            
            # Time-based: alert within dedup window of incident start
            time_since_incident = (now - incident.started_at).total_seconds()
            if time_since_incident > self.dedup_window:
                continue
            
            # Service-based: same service or explicitly related service
            if alert.service in incident.affected_services:
                return incident
            
            # Correlation-based: alert is likely caused by same root issue
            if self.is_likely_correlated(alert, incident):
                return incident
        
        return None
    
    def is_likely_correlated(self, alert, incident):
        """
        Determine if an alert is likely related to an existing incident.
        """
        # Check service dependency graph
        for affected_service in incident.affected_services:
            dependencies = self.service_map.get_dependencies(affected_service)
            if alert.service in dependencies:
                return True
            
            dependents = self.service_map.get_dependents(affected_service)
            if alert.service in dependents:
                return True
        
        # Check shared infrastructure
        shared_infra = self.check_shared_infrastructure(alert.service, incident.affected_services)
        if shared_infra:
            return True
        
        return False

2. Service Dependency Correlation

class ServiceDependencyGraph:
    """
    Track service dependencies to correlate related alerts.
    """
    
    def __init__(self):
        self.dependencies = {
            # service: [services it depends on]
            "checkout-api": ["payment-service", "inventory-service", "database"],
            "payment-service": ["stripe-api", "database"],
            "inventory-service": ["database"],
            "frontend": ["checkout-api", "user-api", "auth-service"],
            "auth-service": ["database", "redis-cache"],
        }
    
    def find_root_cause_candidates(self, failing_services):
        """
        Given a set of failing services, identify likely root causes.
        The root cause service is one that:
        1. Is in the failing set
        2. Other failing services depend on it
        3. Doesn't itself depend on other failing services
        """
        candidates = []
        
        for service in failing_services:
            service_deps = self.dependencies.get(service, [])
            
            # Check if this service's dependencies include other failing services
            failing_deps = [d for d in service_deps if d in failing_services]
            
            if not failing_deps:
                # This service isn't failing because of another failing service
                # It could be the root cause
                candidates.append({
                    "service": service,
                    "other_services_depending_on_it": [
                        s for s in failing_services
                        if service in self.dependencies.get(s, [])
                    ]
                })
        
        # Sort by number of dependent failing services (most likely root cause first)
        return sorted(candidates, key=lambda c: len(c["other_services_depending_on_it"]), reverse=True)
    
    def get_likely_cascade(self, root_service):
        """
        Predict which services will fail if root_service goes down.
        """
        cascade = []
        
        def find_dependents(service):
            for svc, deps in self.dependencies.items():
                if service in deps and svc not in cascade:
                    cascade.append(svc)
                    find_dependents(svc)
        
        find_dependents(root_service)
        return cascade

3. PagerDuty Intelligent Alert Grouping

PagerDuty's built-in grouping features:

# Configure PagerDuty service with intelligent grouping via API
service_config = {
    "service": {
        "name": "checkout-api",
        "alert_grouping": "intelligent",
        "alert_grouping_parameters": {
            "type": "intelligent"
            # PagerDuty ML-based grouping based on historical patterns
        }
    }
}

# Or use time-based grouping for simpler cases
time_based_config = {
    "service": {
        "name": "checkout-api",
        "alert_grouping": "time",
        "alert_grouping_parameters": {
            "type": "time",
            "timeout": 300  # Group alerts within 5 minutes
        }
    }
}

# Or content-based grouping (by field values)
content_based_config = {
    "service": {
        "name": "checkout-api",
        "alert_grouping": "content_based",
        "alert_grouping_parameters": {
            "type": "content_based",
            "fields": ["severity", "source"]  # Group alerts with same severity and source
        }
    }
}

Implementing Dedup Keys

Deduplication keys (dedup keys) prevent the same alert from creating multiple PagerDuty incidents:

def send_alert_with_dedup(
    pd_integration_key: str,
    alert_data: dict
) -> dict:
    """
    Send alert to PagerDuty with deduplication key.
    
    Using the same dedup_key for related alerts ensures they're
    grouped into one incident.
    """
    # Create a stable dedup key for this type of alert
    # Same dedup key = same PagerDuty incident
    dedup_key = generate_dedup_key(alert_data)
    
    payload = {
        "routing_key": pd_integration_key,
        "event_action": "trigger",
        "dedup_key": dedup_key,  # Critical for deduplication
        "payload": {
            "summary": alert_data["summary"],
            "severity": alert_data["severity"],
            "source": alert_data["source"],
            "custom_details": alert_data.get("details", {})
        }
    }
    
    response = requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json=payload
    )
    
    return {
        "dedup_key": dedup_key,
        "status": response.json().get("status")
    }

def generate_dedup_key(alert_data: dict) -> str:
    """
    Generate a stable deduplication key for an alert.
    
    Same service + same alert type = same dedup key during an incident.
    This prevents alert storms from creating multiple PD incidents.
    """
    import hashlib
    
    # Include service name and alert type, but NOT timestamp
    # (we want same-type alerts to share a dedup key)
    key_components = [
        alert_data.get("service", ""),
        alert_data.get("alert_type", ""),
        alert_data.get("environment", "production")
    ]
    
    key_string = "|".join(key_components)
    return hashlib.md5(key_string.encode()).hexdigest()[:16]

# Resolution: use the same dedup key
def resolve_alert(pd_integration_key: str, dedup_key: str):
    """
    Resolve an alert using its dedup key.
    Resolves the PagerDuty incident if all alerts with this key are resolved.
    """
    payload = {
        "routing_key": pd_integration_key,
        "event_action": "resolve",
        "dedup_key": dedup_key
    }
    
    requests.post(
        "https://events.pagerduty.com/v2/enqueue",
        json=payload
    )

Alert Storm Detection and Suppression

When a massive alert storm starts, automatically suppress secondary alerts:

class AlertStormDetector:
    """
    Detect and handle alert storms.
    When a storm is detected, suppress secondary alerts and 
    create a single aggregated incident.
    """
    
    STORM_THRESHOLD = 10  # More than 10 alerts in 2 minutes = storm
    STORM_WINDOW_SECONDS = 120
    
    def __init__(self):
        self.recent_alerts = []
        self.storm_active = False
        self.storm_incident_id = None
    
    def process_alert(self, alert):
        """Check if we're in a storm; suppress or pass through."""
        now = datetime.utcnow()
        
        # Remove old alerts outside the window
        cutoff = now - timedelta(seconds=self.STORM_WINDOW_SECONDS)
        self.recent_alerts = [a for a in self.recent_alerts if a.triggered_at > cutoff]
        self.recent_alerts.append(alert)
        
        # Detect storm start
        if len(self.recent_alerts) >= self.STORM_THRESHOLD and not self.storm_active:
            self.storm_active = True
            self.storm_incident_id = self.create_storm_incident(self.recent_alerts)
            return "storm_started", self.storm_incident_id
        
        # If storm is active, suppress individual alerts
        if self.storm_active:
            self.add_to_storm_incident(self.storm_incident_id, alert)
            return "suppressed", self.storm_incident_id
        
        return "pass_through", None
    
    def create_storm_incident(self, alerts):
        """Create a single aggregated incident for a storm."""
        services_affected = list(set(a.service for a in alerts))
        
        summary = (
            f"ALERT STORM: {len(alerts)} alerts in {self.STORM_WINDOW_SECONDS}s — "
            f"possible major incident. Affected services: {', '.join(services_affected)}"
        )
        
        # Page on-call with storm summary instead of individual alerts
        return self.pagerduty.create_incident(
            summary=summary,
            severity="critical",
            details={
                "storm_alert_count": len(alerts),
                "affected_services": services_affected,
                "first_alert": min(a.triggered_at for a in alerts).isoformat(),
                "note": "Individual alerts suppressed. Investigate root cause, not individual symptoms."
            }
        )

Flap Detection

Alerts that rapidly alternate between firing and resolving are called "flapping":

class FlapDetector:
    """
    Detect flapping alerts and apply suppression or aggregation.
    
    Flapping: alert fires, resolves, fires, resolves in quick succession.
    Often indicates threshold set too aggressively or a transient issue.
    """
    
    def __init__(self, flap_window_minutes=30, flap_threshold=5):
        self.flap_window = flap_window_minutes
        self.flap_threshold = flap_threshold  # N state changes = flapping
        self.alert_history = {}
    
    def check_for_flapping(self, alert_name, current_status):
        """
        Check if an alert is flapping.
        Returns True if flapping detected.
        """
        now = datetime.utcnow()
        cutoff = now - timedelta(minutes=self.flap_window)
        
        if alert_name not in self.alert_history:
            self.alert_history[alert_name] = []
        
        history = self.alert_history[alert_name]
        
        # Remove events outside window
        history = [(t, s) for t, s in history if t > cutoff]
        history.append((now, current_status))
        self.alert_history[alert_name] = history
        
        # Count state transitions
        transitions = sum(
            1 for i in range(1, len(history))
            if history[i][1] != history[i-1][1]
        )
        
        is_flapping = transitions >= self.flap_threshold
        
        if is_flapping:
            return {
                "flapping": True,
                "transitions": transitions,
                "window_minutes": self.flap_window,
                "recommendation": (
                    f"Alert '{alert_name}' has changed state {transitions} times in "
                    f"{self.flap_window} minutes. Consider: (1) raising threshold, "
                    f"(2) using longer evaluation window, or (3) investigating root cause."
                )
            }
        
        return {"flapping": False}

Conclusion

Alert deduplication is the difference between an incident that one engineer handles efficiently and an alert storm that paralyzes a team. The implementation — whether through PagerDuty's built-in grouping, service dependency correlation, or custom dedup keys — determines whether your on-call experience is manageable or overwhelming. Start with consistent dedup keys in your monitoring integrations and PagerDuty's intelligent grouping for service-level deduplication. For complex distributed systems, build service dependency awareness to automatically identify when multiple alerts share a root cause. AzMonitor sends alerts with consistent deduplication context, ensuring that one incident creates one page rather than a storm of notifications.

Tags:alert deduplicationalert groupingalert stormsincident management
Back to blog
A
AzMonitor Team
The AzMonitor team writes guides based on experience monitoring millions of endpoints daily across 10,000+ customer environments. Our expertise covers uptime monitoring, SRE practices, and reliability engineering.
Try AzMonitor free

3 monitors free forever · No credit card needed · Set up in 2 minutes

Start monitoring free →