Low-Level Design: Online Code Judge (LeetCode-style)
The online code judge LLD involves designing a system that accepts code submissions, compiles and executes them in isolation, compares output against expected results, and returns a verdict. It requires sandboxing, resource limits, and a clean submission pipeline. Asked at Databricks, Coinbase, Atlassian, and engineering-focused interviews.
Requirements
- Users submit code in multiple languages (Python, Java, C++, JavaScript).
- Judge runs code against multiple test cases.
- Enforce resource limits: CPU time (2 seconds), memory (256 MB), output size.
- Return verdict: Accepted, Wrong Answer, Time Limit Exceeded, Memory Limit Exceeded, Runtime Error, Compilation Error.
- Support custom test cases in problem creation.
Verdict Enum and Test Case Model
from enum import Enum, auto
from dataclasses import dataclass
import uuid
class Verdict(Enum):
ACCEPTED = "AC"
WRONG_ANSWER = "WA"
TIME_LIMIT_EXCEEDED = "TLE"
MEMORY_LIMIT_EXCEEDED = "MLE"
RUNTIME_ERROR = "RE"
COMPILATION_ERROR = "CE"
PENDING = "PE"
@dataclass
class TestCase:
test_id: str
input_data: str
expected_output: str
@dataclass
class Problem:
problem_id: str
title: str
time_limit_s: float = 2.0
mem_limit_mb: int = 256
test_cases: list = None
def __post_init__(self):
if self.test_cases is None:
self.test_cases = []
@dataclass
class Submission:
submission_id: str
problem_id: str
user_id: str
language: str
code: str
verdict: Verdict = Verdict.PENDING
runtime_ms: float = 0
memory_mb: float = 0
error_message: str = ""
Sandboxed Executor
import subprocess
import tempfile
import os
import resource
import time
LANGUAGE_CONFIG = {
"python": {
"ext": ".py",
"compile": None,
"run": ["python3", "{file}"],
},
"java": {
"ext": ".java",
"compile": ["javac", "{file}"],
"run": ["java", "-cp", "{dir}", "Main"],
},
"cpp": {
"ext": ".cpp",
"compile": ["g++", "-O2", "-o", "{binary}", "{file}"],
"run": ["{binary}"],
},
}
class SandboxedExecutor:
def __init__(self, time_limit_s: float = 2.0,
mem_limit_mb: int = 256,
output_limit_bytes: int = 65536):
self.time_limit = time_limit_s
self.mem_limit = mem_limit_mb * 1024 * 1024
self.output_limit = output_limit_bytes
def run_test(self, code: str, language: str,
input_data: str, expected: str) -> dict:
config = LANGUAGE_CONFIG.get(language)
if not config:
return {"verdict": Verdict.COMPILATION_ERROR,
"error": f"Unsupported language: {language}"}
with tempfile.TemporaryDirectory() as tmpdir:
filename = os.path.join(tmpdir, "solution" + config["ext"])
binary = os.path.join(tmpdir, "solution")
with open(filename, "w") as f:
f.write(code)
# Compile step
if config["compile"]:
compile_cmd = [c.format(file=filename, binary=binary, dir=tmpdir)
for c in config["compile"]]
result = subprocess.run(
compile_cmd, capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
return {"verdict": Verdict.COMPILATION_ERROR,
"error": result.stderr[:500]}
# Execute step
run_cmd = [c.format(file=filename, binary=binary, dir=tmpdir)
for c in config["run"]]
start = time.monotonic()
try:
proc = subprocess.run(
run_cmd,
input=input_data,
capture_output=True,
text=True,
timeout=self.time_limit + 0.5, # small buffer
preexec_fn=self._set_limits
)
elapsed_ms = (time.monotonic() - start) * 1000
except subprocess.TimeoutExpired:
return {"verdict": Verdict.TIME_LIMIT_EXCEEDED,
"runtime_ms": self.time_limit * 1000}
if proc.returncode != 0:
return {"verdict": Verdict.RUNTIME_ERROR,
"error": proc.stderr[:200]}
actual = proc.stdout.strip()
if actual == expected.strip():
return {"verdict": Verdict.ACCEPTED, "runtime_ms": elapsed_ms}
return {"verdict": Verdict.WRONG_ANSWER,
"runtime_ms": elapsed_ms,
"actual": actual[:200]}
def _set_limits(self):
"""Called in child process before exec."""
resource.setrlimit(resource.RLIMIT_AS,
(self.mem_limit, self.mem_limit))
resource.setrlimit(resource.RLIMIT_CPU,
(int(self.time_limit) + 1,
int(self.time_limit) + 1))
Judge Service
class JudgeService:
def __init__(self):
self.problems: dict[str, Problem] = {}
self.submissions: dict[str, Submission] = {}
self.executor = SandboxedExecutor()
def add_problem(self, problem: Problem) -> None:
self.problems[problem.problem_id] = problem
def submit(self, problem_id: str, user_id: str,
language: str, code: str) -> Submission:
problem = self.problems.get(problem_id)
if not problem:
raise ValueError(f"Problem {problem_id} not found")
sub = Submission(
submission_id = str(uuid.uuid4())[:8],
problem_id = problem_id,
user_id = user_id,
language = language,
code = code
)
self.submissions[sub.submission_id] = sub
executor = SandboxedExecutor(
time_limit_s = problem.time_limit_s,
mem_limit_mb = problem.mem_limit_mb
)
# Run against all test cases; stop at first failure
for tc in problem.test_cases:
result = executor.run_test(code, language, tc.input_data, tc.expected_output)
sub.verdict = result["verdict"]
sub.runtime_ms = result.get("runtime_ms", 0)
sub.error_message = result.get("error", result.get("actual", ""))
if sub.verdict != Verdict.ACCEPTED:
break # stop at first failing test case
return sub
def get_submission(self, submission_id: str) -> Submission | None:
return self.submissions.get(submission_id)
Scaling Architecture
- Async execution: submissions are queued to Kafka. Judge worker pool pulls from the queue and executes in parallel. Results written to a database; clients poll or receive WebSocket push.
- Containerized sandbox: each submission runs in a Docker container with cgroup limits (CPU, memory, disk I/O) instead of setrlimit. This isolates submissions from each other and from the host.
- Worker pool auto-scaling: submission rate drives worker count. During contests, scale to hundreds of workers.
- Test case storage: test cases stored in S3; workers fetch on demand and cache locally.
Security Considerations
- Process isolation: containers with network disabled, read-only filesystem, no root.
- Output size limit: truncate stdout at 64 KB to prevent memory exhaustion via print.
- Syscall filtering: seccomp profile blocks dangerous syscalls (fork-bomb prevention, socket creation).
- Separate judge network: workers on an isolated subnet with no internet access.
Interview Extensions
How would you prevent fork bombs and infinite loops?
Fork bombs: set RLIMIT_NPROC to limit child processes. Infinite loops: enforce CPU time limit via RLIMIT_CPU (kills process if it exceeds N CPU-seconds) and wall-clock timeout via subprocess.run(timeout=). Both limits are applied in the child process before exec via preexec_fn=_set_limits.
How do you handle special judges (problems with multiple valid outputs)?
Instead of string comparison, run a custom checker program that takes (input, expected_output, actual_output) as arguments and returns 0 for accepted, non-zero for wrong. The JudgeService calls the checker executable instead of comparing strings. This supports floating-point tolerance, multiple correct answers, and interactive problems.
Asked at: Databricks Interview Guide
Asked at: Coinbase Interview Guide
Asked at: Atlassian Interview Guide
Asked at: Stripe Interview Guide