Designing fallback routing for disconnected field devices
Field connectivity on active construction sites remains inherently unreliable. Cellular dead zones, RF interference from heavy machinery, and temporary network outages routinely interrupt data transmission from tablets, IoT sensors, and wearable safety monitors. When a device drops offline, the routing layer must transition from synchronous API calls to a deterministic, locally cached fallback mechanism. This ensures that critical payloads—change orders, RFIs, daily logs, and budget impact assessments—retain their intended routing paths without data loss or compliance violations. The foundation of this resilience lies in a structured approach to Construction Data Architecture & Taxonomy, which dictates how payloads are classified, prioritized, and ultimately delivered once connectivity is restored.
Routing Logic & Queue Architecture
Fallback routing requires a strict separation between ingestion, local persistence, and dispatch. When a field device detects network degradation, the client must immediately serialize the payload, attach immutable metadata (device ID, local timestamp, WBS code, budget classification, and schema version), and push it to a persistent local queue. The routing engine evaluates each queued item against a deterministic rule set before attempting transmission.
Priority is derived from payload type and financial impact. Safety incidents and change orders exceeding predefined thresholds bypass standard queues and trigger immediate escalation upon reconnection. Standard RFIs and submittal metadata follow FIFO ordering with strict schema validation gates. This tiered approach aligns directly with Fallback Alert Routing principles, ensuring that high-impact data never stalls behind routine administrative logs.
The architecture must also account for network flapping. Rapid oscillation between online and offline states can cause duplicate submissions or race conditions. A deterministic dispatch loop with exponential backoff, jitter, and idempotency keys prevents payload duplication while guaranteeing eventual consistency.
Python Implementation
The following implementation demonstrates a production-ready fallback routing engine. It uses SQLite for local persistence, enforces schema version compatibility, applies WBS-based routing rules, and handles network flapping with exponential backoff. All operations are typed, transactional, and include comprehensive error handling.
import sqlite3
import json
import time
import logging
import hashlib
import requests
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict, Any
from enum import Enum, IntEnum
from contextlib import contextmanager
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
class PayloadPriority(IntEnum):
CRITICAL = 1
HIGH = 2
NORMAL = 3
class RoutingStatus(str, Enum):
QUEUED = "queued"
DISPATCHING = "dispatching"
SYNCED = "synced"
FAILED = "failed"
@dataclass
class FieldPayload:
id: str
device_id: str
timestamp: float
payload_type: str
wbs_code: str
budget_code: str
schema_version: str
data: Dict[str, Any]
priority: PayloadPriority = PayloadPriority.NORMAL
status: RoutingStatus = RoutingStatus.QUEUED
retry_count: int = 0
class FallbackRouter:
SUPPORTED_SCHEMAS = {"1.0", "1.1", "2.0"}
MAX_RETRIES = 5
BASE_DELAY = 2.0
API_TIMEOUT = 15.0
def __init__(self, db_path: str, api_endpoint: str, api_token: str):
self.db_path = db_path
self.api_endpoint = api_endpoint.rstrip("/")
self.session = self._configure_session(api_token)
self._init_db()
def _configure_session(self, token: str) -> requests.Session:
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {token}", "Content-Type": "application/json"})
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST"]
)
session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
return session
def _init_db(self) -> None:
with self._get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS payload_queue (
id TEXT PRIMARY KEY,
device_id TEXT NOT NULL,
timestamp REAL NOT NULL,
payload_type TEXT NOT NULL,
wbs_code TEXT NOT NULL,
budget_code TEXT NOT NULL,
schema_version TEXT NOT NULL,
data TEXT NOT NULL,
priority INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
retry_count INTEGER NOT NULL DEFAULT 0,
created_at REAL DEFAULT (strftime('%s', 'now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_status_priority ON payload_queue(status, priority)")
@contextmanager
def _get_conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def enqueue(self, payload: FieldPayload) -> None:
if payload.schema_version not in self.SUPPORTED_SCHEMAS:
raise ValueError(f"Unsupported schema version: {payload.schema_version}")
payload.id = payload.id or hashlib.sha256(
f"{payload.device_id}:{payload.timestamp}:{payload.payload_type}".encode()
).hexdigest()
with self._get_conn() as conn:
conn.execute("""
INSERT OR IGNORE INTO payload_queue
(id, device_id, timestamp, payload_type, wbs_code, budget_code, schema_version, data, priority, status, retry_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
payload.id, payload.device_id, payload.timestamp, payload.payload_type,
payload.wbs_code, payload.budget_code, payload.schema_version,
json.dumps(payload.data), payload.priority.value, payload.status.value, payload.retry_count
))
logging.info(f"Enqueued payload {payload.id} (Priority: {payload.priority.name})")
def get_pending_payloads(self, limit: int = 50) -> List[FieldPayload]:
with self._get_conn() as conn:
rows = conn.execute("""
SELECT * FROM payload_queue
WHERE status = ?
ORDER BY priority ASC, timestamp ASC
LIMIT ?
""", (RoutingStatus.QUEUED.value, limit)).fetchall()
return [
FieldPayload(
id=row["id"], device_id=row["device_id"], timestamp=row["timestamp"],
payload_type=row["payload_type"], wbs_code=row["wbs_code"],
budget_code=row["budget_code"], schema_version=row["schema_version"],
data=json.loads(row["data"]), priority=PayloadPriority(row["priority"]),
status=RoutingStatus(row["status"]), retry_count=row["retry_count"]
) for row in rows
]
def dispatch_payload(self, payload: FieldPayload) -> bool:
try:
payload.status = RoutingStatus.DISPATCHING
with self._get_conn() as conn:
conn.execute("UPDATE payload_queue SET status = ? WHERE id = ?", (payload.status.value, payload.id))
route_path = f"/api/v2/route/{payload.wbs_code}/{payload.payload_type}"
response = self.session.post(
f"{self.api_endpoint}{route_path}",
json=asdict(payload),
timeout=self.API_TIMEOUT
)
response.raise_for_status()
with self._get_conn() as conn:
conn.execute("UPDATE payload_queue SET status = ? WHERE id = ?", (RoutingStatus.SYNCED.value, payload.id))
logging.info(f"Successfully dispatched payload {payload.id}")
return True
except requests.exceptions.RequestException as e:
logging.warning(f"Dispatch failed for {payload.id}: {e}")
self._handle_retry(payload)
return False
except Exception as e:
logging.error(f"Unexpected error dispatching {payload.id}: {e}")
self._handle_retry(payload)
return False
def _handle_retry(self, payload: FieldPayload) -> None:
payload.retry_count += 1
if payload.retry_count >= self.MAX_RETRIES:
payload.status = RoutingStatus.FAILED
logging.error(f"Max retries exceeded for payload {payload.id}. Marked as FAILED.")
else:
payload.status = RoutingStatus.QUEUED
logging.info(f"Queuing retry {payload.retry_count}/{self.MAX_RETRIES} for payload {payload.id}")
with self._get_conn() as conn:
conn.execute(
"UPDATE payload_queue SET status = ?, retry_count = ? WHERE id = ?",
(payload.status.value, payload.retry_count, payload.id)
)
def process_queue(self) -> None:
pending = self.get_pending_payloads()
if not pending:
logging.info("Queue empty. No payloads to dispatch.")
return
for payload in pending:
success = self.dispatch_payload(payload)
if not success:
delay = min(self.BASE_DELAY * (2 ** payload.retry_count), 60)
logging.info(f"Backing off for {delay:.1f}s before next dispatch cycle.")
time.sleep(delay)
break # Stop processing to respect backoff
if __name__ == "__main__":
router = FallbackRouter(
db_path="field_fallback_queue.db",
api_endpoint="https://api.construction-platform.example.com",
api_token="your_secure_token_here"
)
sample_payload = FieldPayload(
id="co-2024-0892",
device_id="tablet-site-04",
timestamp=time.time(),
payload_type="change_order",
wbs_code="03.02.01",
budget_code="MAT-CONCRETE",
schema_version="2.0",
data={"description": "Rebar substitution due to supply delay", "cost_impact_usd": 14500.00},
priority=PayloadPriority.CRITICAL
)
router.enqueue(sample_payload)
router.process_queue()Debugging & Validation Protocol
When deploying fallback routing to active job sites, systematic validation prevents silent data loss. Follow these steps to verify queue integrity and dispatch reliability:
- Simulate Network Degradation: Use
tc(Linux) or Network Link Conditioner (macOS) to drop packets or throttle bandwidth to <100 kbps. Trigger payload creation and verify immediate SQLite insertion without API timeouts. - Inspect Queue State: Query the local database directly to confirm priority ordering and status transitions:
SELECT id, payload_type, priority, status, retry_count
FROM payload_queue
ORDER BY priority ASC, timestamp ASC;- Validate Schema Gates: Attempt to enqueue a payload with an unsupported
schema_version. The engine should raise aValueErrorand prevent database insertion, ensuring downstream parsers never encounter malformed structures. - Monitor Backoff Behavior: Force repeated
503 Service Unavailableresponses. Verify thatretry_countincrements correctly,statustoggles betweendispatchingandqueued, and the exponential delay matchesBASE_DELAY * (2^n). - Audit Idempotency: Submit duplicate payloads with identical
device_idandtimestamp. ConfirmINSERT OR IGNOREprevents queue bloat and that the routing layer processes only one authoritative record.
Production Deployment Safeguards
- Storage Limits: Implement a maximum queue size (e.g., 500 MB or 10,000 records). When thresholds are reached, purge
FAILEDrecords older than 30 days and alert site supervisors via SMS or local push notification. - Conflict Resolution: When multiple devices submit overlapping change orders for the same WBS code, apply a deterministic merge strategy. Use
timestampas the tiebreaker and flag budget discrepancies for estimator review before final sync. - Compliance & Audit Trails: Never delete
SYNCEDrecords immediately. Archive them to a read-only compliance table for 7 years per construction contract retention requirements. Include cryptographic hashes of the original payload to satisfy audit verification. - Battery & Resource Optimization: Schedule queue processing during device idle states or when charging. Avoid continuous polling; instead, use OS-level network state change listeners to trigger
process_queue()only when connectivity stabilizes.
By decoupling ingestion from transmission and enforcing strict priority routing, field applications maintain operational continuity regardless of network conditions. This architecture guarantees that financial, safety, and scheduling data reaches central systems intact, preserving project timelines and contractual compliance.