Actual update for Kafka+SSE in both Backend and iOS

1. Backend — Implemented (2026-03-09)

Status: COMPLETED — All Tests Passing

Merged to main via commit 99ea583 (branch feature/kafka-job-status-sse). 42 unit tests + 8 integration tests passing. Jenkins CI/CD triggered.


Commit Summary

Add Kafka job status tracking with SSE for POST /geo/save

Replace fire-and-forget Kafka pattern with tracked async processing. The save endpoint now creates a PENDING JobStatus in MongoDB, wraps data in KafkaMessage(trackingId + payload), and returns 202 with trackingId. Kafka consumer updates job status to COMPLETED/DUPLICATE/FAILED and pushes real-time updates via SseEmitter to connected clients.

New endpoints:

  • GET /geo/status/stream/{trackingId} (SSE)
  • GET /geo/status/{trackingId} (REST fallback)
  • POST /auth/geo/save (auth proxy with ownerEmail injection)
  • GET /auth/geo/status/stream/{trackingId} (SSE proxy)
  • GET /auth/geo/status/{trackingId} (REST proxy)
  • DELETE /auth/geo/{youtubeContentId} (auth proxy)

Geo content endpoints moved from AuthController to GeoDataController. Old /auth/adventuretubedata endpoints removed.


Test Coverage

Geospatial Service — Unit Tests (17 new)

  • ConsumerTest (6 tests): COMPLETED/DUPLICATE/FAILED flows, legacy format, invalid JSON, chapter/place counting
  • JobStatusServiceTest (7 tests): createPendingJob, markCompleted/Duplicate/Failed with SSE push, not-found, findByTrackingId
  • SseEmitterManagerTest (4 tests): register, no-op for unknown, idempotent sends

Geospatial Service — Controller Tests (14 existing, updated)

  • AdventureTubeDataControllerTest: Updated save test for 202+trackingId response, added getStatus tests

Geospatial Service — Integration Tests (8)

  • JobStatusFullFlowIT: save→PENDING, REST status PENDING/COMPLETED/DUPLICATE, 404, SSE terminal, markFailed, TTL
  • AdventureTubeDataFullStackIT: Updated for new Producer signature and response format

Auth Service — Component Tests (6 new)

  • GeoDataServiceComponentTest: saveWithOwnerEmail (ownerEmail injection), getJobStatus COMPLETED/DUPLICATE/404, deleteByYoutubeContentId, 403 ownership mismatch

Postman Collection Updated

  • 5 create requests: /auth/adventuretubedata/auth/geo/save + auto-save {{trackingId}} script
  • 5 delete requests: /auth/adventuretubedata/{ytId}/auth/geo/{ytId}
  • New “Job Status” folder: Get Job Status (REST) + Stream Job Status (SSE) using {{trackingId}}

Architecture

iOS → POST /auth/geo/save + JWT
  Auth Service: extract email from JWT → inject ownerEmail → forward to Geospatial
  Geospatial: create PENDING JobStatus → publish KafkaMessage(trackingId + data) → return 202 {trackingId}
  ← 202 { trackingId, status: PENDING }

iOS → GET /auth/geo/status/stream/{trackingId} + JWT (SSE opens)
  ← event: { status: PENDING }
  ...Kafka consumer finishes...
  ← event: { status: COMPLETED, chaptersCount: 5, placesCount: 3 }
  ← connection closes

Key component: SseEmitterManager — in-memory ConcurrentHashMap<trackingId, SseEmitter> that bridges the Kafka consumer thread to the SSE connection thread.


Files Summary

New (9 source + 3 test = 12):

  • geospatial-service/.../model/enums/JobStatusEnum.java — PENDING, COMPLETED, DUPLICATE, FAILED
  • geospatial-service/.../model/entity/JobStatus.java — MongoDB @Document with TTL 7d
  • geospatial-service/.../repository/JobStatusRepository.java — findByTrackingId()
  • geospatial-service/.../kafka/KafkaMessage.java — Wrapper: { trackingId, AdventureTubeData }
  • geospatial-service/.../sse/SseEmitterManager.java — ConcurrentHashMap bridge
  • geospatial-service/.../service/JobStatusService.java — create/mark status + SSE push
  • geospatial-service/.../exceptions/JobNotFoundException.java
  • auth-service/.../controller/GeoDataController.java — @RequestMapping(“/auth/geo”)
  • auth-service/.../service/GeoDataService.java — JWT email extraction + proxy
  • geospatial-service/.../kafka/ConsumerTest.java — 6 unit tests
  • geospatial-service/.../service/JobStatusServiceTest.java — 7 unit tests
  • auth-service/.../component/service/GeoDataServiceComponentTest.java — 6 component tests

Modified (8 source + 3 test = 11):

  • geospatial-service/.../exceptions/code/GeoErrorCode.java — added JOB_NOT_FOUND
  • geospatial-service/.../exceptions/GlobalExceptionHandler.java — added JobNotFoundException handler
  • geospatial-service/.../kafka/Producer.java — new signature with trackingId, wraps in KafkaMessage
  • geospatial-service/.../kafka/Consumer.java — KafkaMessage deserialization, status tracking
  • geospatial-service/.../controller/AdventureTubeDataController.java — 202 + SSE + REST status endpoints
  • common-api/.../client/ServiceClient.java — added getSseStreamReactive()
  • auth-service/.../controller/AuthController.java — removed old geo endpoints
  • config-service/.../config/auth-service.yml — geospatial-service URL + circuit breaker
  • geospatial-service/.../controller/AdventureTubeDataControllerTest.java — updated for new response
  • geospatial-service/.../integration/AdventureTubeDataFullStackIT.java — updated Producer signature
  • geospatial-service/.../integration/JobStatusFullFlowIT.java — 8 integration tests

Key Design Decisions

  • trackingId = UUID (not youtubeContentID) — each submission is a distinct job
  • ownerEmail injected by Auth Service from JWT — tamper-proof
  • Auth Service uses JsonNode — no coupling to AdventureTubeData entity
  • MongoDB jobStatus collection with TTL 7-day auto-cleanup via expireAt index
  • SseEmitter on Tomcat (Spring MVC) — no WebFlux needed in geospatial-service
  • Auth Service SSE proxy — WebClient bodyToFlux(ServerSentEvent) relay to iOS
  • REST fallback GET /geo/status/{trackingId} — for non-SSE clients and Postman
  • 30s SseEmitter timeout — if consumer hasn’t processed, client reconnects or uses REST
  • Race condition handling — SSE endpoint checks current status on connect, sends immediately if terminal
  • Backward compatibility — Kafka consumer tries KafkaMessage first, falls back to raw AdventureTubeData

2. iOS — Implemented (2026-03-09)

Status: COMPLETED — Build Verified

xcodebuild build succeeded with zero errors.


Summary

Replaced the old synchronous uploadStory() flow (which POSTed to APIService.rasberryTestServer and used URLSession.DataTaskPublisher) with the new async Kafka+SSE flow:

  1. POST to /auth/geo/save with JWT → receive 202 Accepted with trackingId
  2. Open SSE stream at /auth/geo/status/stream/{trackingId} for real-time status
  3. Fall back to REST polling at /auth/geo/status/{trackingId} if SSE fails
  4. UI overlay shows real-time publishing progress with status updates

Files Created (2)

1. JobStatusDTO.swift

Path: AdventureTube/Services/APIService/Adventuretube/JobStatusDTO.swift

Response models matching the backend:

  • ServiceResponse<T: Decodable> — generic wrapper with success, message, errorCode, data, timestamp
  • JobStatusDTO — contains trackingId, youtubeContentID, status, errorMessage, chaptersCount, placesCount
  • JobStatusType enum — PENDING, COMPLETED, DUPLICATE, FAILED with isTerminal computed property

2. SSEClient.swift

Path: AdventureTube/Services/NetworkService/SSEClient.swift

Generic SSE parser using URLSessionDataDelegate + Combine:

  • Uses URLSessionDataTask with delegate for true streaming (not buffered dataTaskPublisher)
  • Implements urlSession(_:dataTask:didReceive data:) to receive chunks incrementally
  • Maintains a String buffer, parses data: lines, emits events on double-newline boundaries
  • Exposes PassthroughSubject<Data, Error> via .publisherAnyPublisher<Data, Error>
  • connect(url:headers:) starts the stream, disconnect() cancels and cleans up
  • Validates HTTP response status in didReceive response: — sends BackendError on non-2xx
  • Handles cancellation gracefully (sends .finished, not .failure)
  • 300s request timeout, 600s resource timeout

Files Modified (4)

3. AdventureTubeAPIService.swift

Path: AdventureTube/Services/APIService/Adventuretube/AdventureTubeAPIService.swift

Added activeSSEClient: SSEClient? property and 4 new methods:

  • publishGeoData(_ jsonData: Data)AnyPublisher<ServiceResponse<JobStatusDTO>, Error>
  • streamJobStatus(trackingId:)AnyPublisher<JobStatusDTO, Error>
  • pollJobStatus(trackingId:)AnyPublisher<ServiceResponse<JobStatusDTO>, Error>
  • cancelSSEStream() — disconnects active SSE client and nils the reference

4. AdventureTubeAPIProtocol.swift

Path: AdventureTube/Services/APIService/Adventuretube/AdventureTubeAPIProtocol.swift

Added 3 new method signatures to the protocol.

5. AddStoryViewVM.swift

Path: AdventureTube/Views/MyStory/AddStoryView/AddStoryViewVM.swift

New PublishingStatus enum, rewrote uploadStory() for async Kafka+SSE flow with SSE tracking, polling fallback, and central handleJobStatus() handler.

6. AddStoryView.swift

Path: AdventureTube/Views/MyStory/AddStoryView/AddStoryView.swift

New publishing overlay with status-specific UI (spinner, checkmark, warning, error).


Verification

Test Status
Build: xcodebuild BUILD SUCCEEDED
Functional test: Publish story → overlay → status updates → “Published!” Pending
Duplicate test: Publish same story → “Already Exists” message Pending
SSE fallback test: Kill SSE mid-stream → polling fallback kicks in Pending
Core Data: storyEntity.isPublished set to true on COMPLETED Pending
JWT: Authorization header present on POST, SSE, and polling requests Pending

Git Branch

feature/content-management-sse

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top