Skip to content

Designing fallback routing for disconnected field devices

Field connectivity on active construction sites remains inherently unreliable. Cellular dead zones, RF interference from heavy machinery, and temporary network outages routinely interrupt data transmission from tablets, IoT sensors, and wearable safety monitors. When a device drops offline, the routing layer must transition from synchronous API calls to a deterministic, locally cached fallback mechanism. This ensures that critical payloads—change orders, RFIs, daily logs, and budget impact assessments—retain their intended routing paths without data loss or compliance violations. The foundation of this resilience lies in a structured approach to Construction Data Architecture & Taxonomy, which dictates how payloads are classified, prioritized, and ultimately delivered once connectivity is restored.

Routing Logic & Queue Architecture

Fallback routing requires a strict separation between ingestion, local persistence, and dispatch. When a field device detects network degradation, the client must immediately serialize the payload, attach immutable metadata (device ID, local timestamp, WBS code, budget classification, and schema version), and push it to a persistent local queue. The routing engine evaluates each queued item against a deterministic rule set before attempting transmission.

Priority is derived from payload type and financial impact. Safety incidents and change orders exceeding predefined thresholds bypass standard queues and trigger immediate escalation upon reconnection. Standard RFIs and submittal metadata follow FIFO ordering with strict schema validation gates. This tiered approach aligns directly with Fallback Alert Routing principles, ensuring that high-impact data never stalls behind routine administrative logs.

The architecture must also account for network flapping. Rapid oscillation between online and offline states can cause duplicate submissions or race conditions. A deterministic dispatch loop with exponential backoff, jitter, and idempotency keys prevents payload duplication while guaranteeing eventual consistency.

Python Implementation

The following implementation demonstrates a production-ready fallback routing engine. It uses SQLite for local persistence, enforces schema version compatibility, applies WBS-based routing rules, and handles network flapping with exponential backoff. All operations are typed, transactional, and include comprehensive error handling.

import sqlite3
import json
import time
import logging
import hashlib
import requests
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict, Any
from enum import Enum, IntEnum
from contextlib import contextmanager
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

class PayloadPriority(IntEnum):
    CRITICAL = 1
    HIGH = 2
    NORMAL = 3

class RoutingStatus(str, Enum):
    QUEUED = "queued"
    DISPATCHING = "dispatching"
    SYNCED = "synced"
    FAILED = "failed"

@dataclass
class FieldPayload:
    id: str
    device_id: str
    timestamp: float
    payload_type: str
    wbs_code: str
    budget_code: str
    schema_version: str
    data: Dict[str, Any]
    priority: PayloadPriority = PayloadPriority.NORMAL
    status: RoutingStatus = RoutingStatus.QUEUED
    retry_count: int = 0

class FallbackRouter:
    SUPPORTED_SCHEMAS = {"1.0", "1.1", "2.0"}
    MAX_RETRIES = 5
    BASE_DELAY = 2.0
    API_TIMEOUT = 15.0

    def __init__(self, db_path: str, api_endpoint: str, api_token: str):
        self.db_path = db_path
        self.api_endpoint = api_endpoint.rstrip("/")
        self.session = self._configure_session(api_token)
        self._init_db()

    def _configure_session(self, token: str) -> requests.Session:
        session = requests.Session()
        session.headers.update({"Authorization": f"Bearer {token}", "Content-Type": "application/json"})
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["POST"]
        )
        session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
        return session

    def _init_db(self) -> None:
        with self._get_conn() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS payload_queue (
                    id TEXT PRIMARY KEY,
                    device_id TEXT NOT NULL,
                    timestamp REAL NOT NULL,
                    payload_type TEXT NOT NULL,
                    wbs_code TEXT NOT NULL,
                    budget_code TEXT NOT NULL,
                    schema_version TEXT NOT NULL,
                    data TEXT NOT NULL,
                    priority INTEGER NOT NULL,
                    status TEXT NOT NULL DEFAULT 'queued',
                    retry_count INTEGER NOT NULL DEFAULT 0,
                    created_at REAL DEFAULT (strftime('%s', 'now'))
                )
            """)
            conn.execute("CREATE INDEX IF NOT EXISTS idx_status_priority ON payload_queue(status, priority)")

    @contextmanager
    def _get_conn(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()

    def enqueue(self, payload: FieldPayload) -> None:
        if payload.schema_version not in self.SUPPORTED_SCHEMAS:
            raise ValueError(f"Unsupported schema version: {payload.schema_version}")

        payload.id = payload.id or hashlib.sha256(
            f"{payload.device_id}:{payload.timestamp}:{payload.payload_type}".encode()
        ).hexdigest()

        with self._get_conn() as conn:
            conn.execute("""
                INSERT OR IGNORE INTO payload_queue
                (id, device_id, timestamp, payload_type, wbs_code, budget_code, schema_version, data, priority, status, retry_count)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                payload.id, payload.device_id, payload.timestamp, payload.payload_type,
                payload.wbs_code, payload.budget_code, payload.schema_version,
                json.dumps(payload.data), payload.priority.value, payload.status.value, payload.retry_count
            ))
            logging.info(f"Enqueued payload {payload.id} (Priority: {payload.priority.name})")

    def get_pending_payloads(self, limit: int = 50) -> List[FieldPayload]:
        with self._get_conn() as conn:
            rows = conn.execute("""
                SELECT * FROM payload_queue
                WHERE status = ?
                ORDER BY priority ASC, timestamp ASC
                LIMIT ?
            """, (RoutingStatus.QUEUED.value, limit)).fetchall()
            return [
                FieldPayload(
                    id=row["id"], device_id=row["device_id"], timestamp=row["timestamp"],
                    payload_type=row["payload_type"], wbs_code=row["wbs_code"],
                    budget_code=row["budget_code"], schema_version=row["schema_version"],
                    data=json.loads(row["data"]), priority=PayloadPriority(row["priority"]),
                    status=RoutingStatus(row["status"]), retry_count=row["retry_count"]
                ) for row in rows
            ]

    def dispatch_payload(self, payload: FieldPayload) -> bool:
        try:
            payload.status = RoutingStatus.DISPATCHING
            with self._get_conn() as conn:
                conn.execute("UPDATE payload_queue SET status = ? WHERE id = ?", (payload.status.value, payload.id))

            route_path = f"/api/v2/route/{payload.wbs_code}/{payload.payload_type}"
            response = self.session.post(
                f"{self.api_endpoint}{route_path}",
                json=asdict(payload),
                timeout=self.API_TIMEOUT
            )
            response.raise_for_status()

            with self._get_conn() as conn:
                conn.execute("UPDATE payload_queue SET status = ? WHERE id = ?", (RoutingStatus.SYNCED.value, payload.id))
            logging.info(f"Successfully dispatched payload {payload.id}")
            return True

        except requests.exceptions.RequestException as e:
            logging.warning(f"Dispatch failed for {payload.id}: {e}")
            self._handle_retry(payload)
            return False
        except Exception as e:
            logging.error(f"Unexpected error dispatching {payload.id}: {e}")
            self._handle_retry(payload)
            return False

    def _handle_retry(self, payload: FieldPayload) -> None:
        payload.retry_count += 1
        if payload.retry_count >= self.MAX_RETRIES:
            payload.status = RoutingStatus.FAILED
            logging.error(f"Max retries exceeded for payload {payload.id}. Marked as FAILED.")
        else:
            payload.status = RoutingStatus.QUEUED
            logging.info(f"Queuing retry {payload.retry_count}/{self.MAX_RETRIES} for payload {payload.id}")

        with self._get_conn() as conn:
            conn.execute(
                "UPDATE payload_queue SET status = ?, retry_count = ? WHERE id = ?",
                (payload.status.value, payload.retry_count, payload.id)
            )

    def process_queue(self) -> None:
        pending = self.get_pending_payloads()
        if not pending:
            logging.info("Queue empty. No payloads to dispatch.")
            return

        for payload in pending:
            success = self.dispatch_payload(payload)
            if not success:
                delay = min(self.BASE_DELAY * (2 ** payload.retry_count), 60)
                logging.info(f"Backing off for {delay:.1f}s before next dispatch cycle.")
                time.sleep(delay)
                break  # Stop processing to respect backoff

if __name__ == "__main__":
    router = FallbackRouter(
        db_path="field_fallback_queue.db",
        api_endpoint="https://api.construction-platform.example.com",
        api_token="your_secure_token_here"
    )

    sample_payload = FieldPayload(
        id="co-2024-0892",
        device_id="tablet-site-04",
        timestamp=time.time(),
        payload_type="change_order",
        wbs_code="03.02.01",
        budget_code="MAT-CONCRETE",
        schema_version="2.0",
        data={"description": "Rebar substitution due to supply delay", "cost_impact_usd": 14500.00},
        priority=PayloadPriority.CRITICAL
    )

    router.enqueue(sample_payload)
    router.process_queue()

Debugging & Validation Protocol

When deploying fallback routing to active job sites, systematic validation prevents silent data loss. Follow these steps to verify queue integrity and dispatch reliability:

  1. Simulate Network Degradation: Use tc (Linux) or Network Link Conditioner (macOS) to drop packets or throttle bandwidth to <100 kbps. Trigger payload creation and verify immediate SQLite insertion without API timeouts.
  2. Inspect Queue State: Query the local database directly to confirm priority ordering and status transitions:
  SELECT id, payload_type, priority, status, retry_count
  FROM payload_queue
  ORDER BY priority ASC, timestamp ASC;
  1. Validate Schema Gates: Attempt to enqueue a payload with an unsupported schema_version. The engine should raise a ValueError and prevent database insertion, ensuring downstream parsers never encounter malformed structures.
  2. Monitor Backoff Behavior: Force repeated 503 Service Unavailable responses. Verify that retry_count increments correctly, status toggles between dispatching and queued, and the exponential delay matches BASE_DELAY * (2^n).
  3. Audit Idempotency: Submit duplicate payloads with identical device_id and timestamp. Confirm INSERT OR IGNORE prevents queue bloat and that the routing layer processes only one authoritative record.

Production Deployment Safeguards

  • Storage Limits: Implement a maximum queue size (e.g., 500 MB or 10,000 records). When thresholds are reached, purge FAILED records older than 30 days and alert site supervisors via SMS or local push notification.
  • Conflict Resolution: When multiple devices submit overlapping change orders for the same WBS code, apply a deterministic merge strategy. Use timestamp as the tiebreaker and flag budget discrepancies for estimator review before final sync.
  • Compliance & Audit Trails: Never delete SYNCED records immediately. Archive them to a read-only compliance table for 7 years per construction contract retention requirements. Include cryptographic hashes of the original payload to satisfy audit verification.
  • Battery & Resource Optimization: Schedule queue processing during device idle states or when charging. Avoid continuous polling; instead, use OS-level network state change listeners to trigger process_queue() only when connectivity stabilizes.

By decoupling ingestion from transmission and enforcing strict priority routing, field applications maintain operational continuity regardless of network conditions. This architecture guarantees that financial, safety, and scheduling data reaches central systems intact, preserving project timelines and contractual compliance.