1. Backend — Implemented (2026-03-09)
Status: COMPLETED — All Tests Passing
Merged to main via commit 99ea583 (branch feature/kafka-job-status-sse). 42 unit tests + 8 integration tests passing. Jenkins CI/CD triggered.
Commit Summary
Add Kafka job status tracking with SSE for POST /geo/save
Replace fire-and-forget Kafka pattern with tracked async processing. The save endpoint now creates a PENDING JobStatus in MongoDB, wraps data in KafkaMessage(trackingId + payload), and returns 202 with trackingId. Kafka consumer updates job status to COMPLETED/DUPLICATE/FAILED and pushes real-time updates via SseEmitter to connected clients.
New endpoints:
GET /geo/status/stream/{trackingId}(SSE)GET /geo/status/{trackingId}(REST fallback)POST /auth/geo/save(auth proxy with ownerEmail injection)GET /auth/geo/status/stream/{trackingId}(SSE proxy)GET /auth/geo/status/{trackingId}(REST proxy)DELETE /auth/geo/{youtubeContentId}(auth proxy)
Geo content endpoints moved from AuthController to GeoDataController. Old /auth/adventuretubedata endpoints removed.
Test Coverage
Geospatial Service — Unit Tests (17 new)
ConsumerTest(6 tests): COMPLETED/DUPLICATE/FAILED flows, legacy format, invalid JSON, chapter/place countingJobStatusServiceTest(7 tests): createPendingJob, markCompleted/Duplicate/Failed with SSE push, not-found, findByTrackingIdSseEmitterManagerTest(4 tests): register, no-op for unknown, idempotent sends
Geospatial Service — Controller Tests (14 existing, updated)
AdventureTubeDataControllerTest: Updated save test for 202+trackingId response, added getStatus tests
Geospatial Service — Integration Tests (8)
JobStatusFullFlowIT: save→PENDING, REST status PENDING/COMPLETED/DUPLICATE, 404, SSE terminal, markFailed, TTLAdventureTubeDataFullStackIT: Updated for new Producer signature and response format
Auth Service — Component Tests (6 new)
GeoDataServiceComponentTest: saveWithOwnerEmail (ownerEmail injection), getJobStatus COMPLETED/DUPLICATE/404, deleteByYoutubeContentId, 403 ownership mismatch
Postman Collection Updated
- 5 create requests:
/auth/adventuretubedata→/auth/geo/save+ auto-save{{trackingId}}script - 5 delete requests:
/auth/adventuretubedata/{ytId}→/auth/geo/{ytId} - New “Job Status” folder: Get Job Status (REST) + Stream Job Status (SSE) using
{{trackingId}}
Architecture
iOS → POST /auth/geo/save + JWT
Auth Service: extract email from JWT → inject ownerEmail → forward to Geospatial
Geospatial: create PENDING JobStatus → publish KafkaMessage(trackingId + data) → return 202 {trackingId}
← 202 { trackingId, status: PENDING }
iOS → GET /auth/geo/status/stream/{trackingId} + JWT (SSE opens)
← event: { status: PENDING }
...Kafka consumer finishes...
← event: { status: COMPLETED, chaptersCount: 5, placesCount: 3 }
← connection closes
Key component: SseEmitterManager — in-memory ConcurrentHashMap<trackingId, SseEmitter> that bridges the Kafka consumer thread to the SSE connection thread.
Files Summary
New (9 source + 3 test = 12):
geospatial-service/.../model/enums/JobStatusEnum.java— PENDING, COMPLETED, DUPLICATE, FAILEDgeospatial-service/.../model/entity/JobStatus.java— MongoDB @Document with TTL 7dgeospatial-service/.../repository/JobStatusRepository.java— findByTrackingId()geospatial-service/.../kafka/KafkaMessage.java— Wrapper: { trackingId, AdventureTubeData }geospatial-service/.../sse/SseEmitterManager.java— ConcurrentHashMap bridgegeospatial-service/.../service/JobStatusService.java— create/mark status + SSE pushgeospatial-service/.../exceptions/JobNotFoundException.javaauth-service/.../controller/GeoDataController.java— @RequestMapping(“/auth/geo”)auth-service/.../service/GeoDataService.java— JWT email extraction + proxygeospatial-service/.../kafka/ConsumerTest.java— 6 unit testsgeospatial-service/.../service/JobStatusServiceTest.java— 7 unit testsauth-service/.../component/service/GeoDataServiceComponentTest.java— 6 component tests
Modified (8 source + 3 test = 11):
geospatial-service/.../exceptions/code/GeoErrorCode.java— added JOB_NOT_FOUNDgeospatial-service/.../exceptions/GlobalExceptionHandler.java— added JobNotFoundException handlergeospatial-service/.../kafka/Producer.java— new signature with trackingId, wraps in KafkaMessagegeospatial-service/.../kafka/Consumer.java— KafkaMessage deserialization, status trackinggeospatial-service/.../controller/AdventureTubeDataController.java— 202 + SSE + REST status endpointscommon-api/.../client/ServiceClient.java— added getSseStreamReactive()auth-service/.../controller/AuthController.java— removed old geo endpointsconfig-service/.../config/auth-service.yml— geospatial-service URL + circuit breakergeospatial-service/.../controller/AdventureTubeDataControllerTest.java— updated for new responsegeospatial-service/.../integration/AdventureTubeDataFullStackIT.java— updated Producer signaturegeospatial-service/.../integration/JobStatusFullFlowIT.java— 8 integration tests
Key Design Decisions
- trackingId = UUID (not youtubeContentID) — each submission is a distinct job
- ownerEmail injected by Auth Service from JWT — tamper-proof
- Auth Service uses JsonNode — no coupling to AdventureTubeData entity
- MongoDB jobStatus collection with TTL 7-day auto-cleanup via expireAt index
- SseEmitter on Tomcat (Spring MVC) — no WebFlux needed in geospatial-service
- Auth Service SSE proxy — WebClient bodyToFlux(ServerSentEvent) relay to iOS
- REST fallback GET /geo/status/{trackingId} — for non-SSE clients and Postman
- 30s SseEmitter timeout — if consumer hasn’t processed, client reconnects or uses REST
- Race condition handling — SSE endpoint checks current status on connect, sends immediately if terminal
- Backward compatibility — Kafka consumer tries KafkaMessage first, falls back to raw AdventureTubeData
2. iOS — Implemented (2026-03-09)
Status: COMPLETED — Build Verified
xcodebuild build succeeded with zero errors.
Summary
Replaced the old synchronous uploadStory() flow (which POSTed to APIService.rasberryTestServer and used URLSession.DataTaskPublisher) with the new async Kafka+SSE flow:
- POST to
/auth/geo/savewith JWT → receive202 AcceptedwithtrackingId - Open SSE stream at
/auth/geo/status/stream/{trackingId}for real-time status - Fall back to REST polling at
/auth/geo/status/{trackingId}if SSE fails - UI overlay shows real-time publishing progress with status updates
Files Created (2)
1. JobStatusDTO.swift
Path: AdventureTube/Services/APIService/Adventuretube/JobStatusDTO.swift
Response models matching the backend:
ServiceResponse<T: Decodable>— generic wrapper withsuccess,message,errorCode,data,timestampJobStatusDTO— containstrackingId,youtubeContentID,status,errorMessage,chaptersCount,placesCountJobStatusTypeenum —PENDING,COMPLETED,DUPLICATE,FAILEDwithisTerminalcomputed property
2. SSEClient.swift
Path: AdventureTube/Services/NetworkService/SSEClient.swift
Generic SSE parser using URLSessionDataDelegate + Combine:
- Uses
URLSessionDataTaskwith delegate for true streaming (not buffereddataTaskPublisher) - Implements
urlSession(_:dataTask:didReceive data:)to receive chunks incrementally - Maintains a
Stringbuffer, parsesdata:lines, emits events on double-newline boundaries - Exposes
PassthroughSubject<Data, Error>via.publisher→AnyPublisher<Data, Error> connect(url:headers:)starts the stream,disconnect()cancels and cleans up- Validates HTTP response status in
didReceive response:— sendsBackendErroron non-2xx - Handles cancellation gracefully (sends
.finished, not.failure) - 300s request timeout, 600s resource timeout
Files Modified (4)
3. AdventureTubeAPIService.swift
Path: AdventureTube/Services/APIService/Adventuretube/AdventureTubeAPIService.swift
Added activeSSEClient: SSEClient? property and 4 new methods:
publishGeoData(_ jsonData: Data)→AnyPublisher<ServiceResponse<JobStatusDTO>, Error>streamJobStatus(trackingId:)→AnyPublisher<JobStatusDTO, Error>pollJobStatus(trackingId:)→AnyPublisher<ServiceResponse<JobStatusDTO>, Error>cancelSSEStream()— disconnects active SSE client and nils the reference
4. AdventureTubeAPIProtocol.swift
Path: AdventureTube/Services/APIService/Adventuretube/AdventureTubeAPIProtocol.swift
Added 3 new method signatures to the protocol.
5. AddStoryViewVM.swift
Path: AdventureTube/Views/MyStory/AddStoryView/AddStoryViewVM.swift
New PublishingStatus enum, rewrote uploadStory() for async Kafka+SSE flow with SSE tracking, polling fallback, and central handleJobStatus() handler.
6. AddStoryView.swift
Path: AdventureTube/Views/MyStory/AddStoryView/AddStoryView.swift
New publishing overlay with status-specific UI (spinner, checkmark, warning, error).
Verification
| Test | Status |
|---|---|
Build: xcodebuild |
BUILD SUCCEEDED |
| Functional test: Publish story → overlay → status updates → “Published!” | Pending |
| Duplicate test: Publish same story → “Already Exists” message | Pending |
| SSE fallback test: Kill SSE mid-stream → polling fallback kicks in | Pending |
Core Data: storyEntity.isPublished set to true on COMPLETED |
Pending |
| JWT: Authorization header present on POST, SSE, and polling requests | Pending |
Git Branch
feature/content-management-sse
