Low-Level Design: Online Code Judge (LeetCode-style Submission System)

Low-Level Design: Online Code Judge (LeetCode-style)

The online code judge LLD involves designing a system that accepts code submissions, compiles and executes them in isolation, compares output against expected results, and returns a verdict. It requires sandboxing, resource limits, and a clean submission pipeline. Asked at Databricks, Coinbase, Atlassian, and engineering-focused interviews.

Requirements

  • Users submit code in multiple languages (Python, Java, C++, JavaScript).
  • Judge runs code against multiple test cases.
  • Enforce resource limits: CPU time (2 seconds), memory (256 MB), output size.
  • Return verdict: Accepted, Wrong Answer, Time Limit Exceeded, Memory Limit Exceeded, Runtime Error, Compilation Error.
  • Support custom test cases in problem creation.

Verdict Enum and Test Case Model

from enum import Enum, auto
from dataclasses import dataclass
import uuid

class Verdict(Enum):
    ACCEPTED              = "AC"
    WRONG_ANSWER          = "WA"
    TIME_LIMIT_EXCEEDED   = "TLE"
    MEMORY_LIMIT_EXCEEDED = "MLE"
    RUNTIME_ERROR         = "RE"
    COMPILATION_ERROR     = "CE"
    PENDING               = "PE"

@dataclass
class TestCase:
    test_id:        str
    input_data:     str
    expected_output: str

@dataclass
class Problem:
    problem_id:   str
    title:        str
    time_limit_s: float   = 2.0
    mem_limit_mb: int     = 256
    test_cases:   list    = None

    def __post_init__(self):
        if self.test_cases is None:
            self.test_cases = []

@dataclass
class Submission:
    submission_id: str
    problem_id:    str
    user_id:       str
    language:      str
    code:          str
    verdict:       Verdict = Verdict.PENDING
    runtime_ms:    float   = 0
    memory_mb:     float   = 0
    error_message: str     = ""

Sandboxed Executor

import subprocess
import tempfile
import os
import resource
import time

LANGUAGE_CONFIG = {
    "python": {
        "ext":     ".py",
        "compile": None,
        "run":     ["python3", "{file}"],
    },
    "java": {
        "ext":     ".java",
        "compile": ["javac", "{file}"],
        "run":     ["java", "-cp", "{dir}", "Main"],
    },
    "cpp": {
        "ext":     ".cpp",
        "compile": ["g++", "-O2", "-o", "{binary}", "{file}"],
        "run":     ["{binary}"],
    },
}

class SandboxedExecutor:
    def __init__(self, time_limit_s: float = 2.0,
                 mem_limit_mb: int = 256,
                 output_limit_bytes: int = 65536):
        self.time_limit    = time_limit_s
        self.mem_limit     = mem_limit_mb * 1024 * 1024
        self.output_limit  = output_limit_bytes

    def run_test(self, code: str, language: str,
                 input_data: str, expected: str) -> dict:
        config = LANGUAGE_CONFIG.get(language)
        if not config:
            return {"verdict": Verdict.COMPILATION_ERROR,
                    "error":   f"Unsupported language: {language}"}

        with tempfile.TemporaryDirectory() as tmpdir:
            filename  = os.path.join(tmpdir, "solution" + config["ext"])
            binary    = os.path.join(tmpdir, "solution")

            with open(filename, "w") as f:
                f.write(code)

            # Compile step
            if config["compile"]:
                compile_cmd = [c.format(file=filename, binary=binary, dir=tmpdir)
                               for c in config["compile"]]
                result = subprocess.run(
                    compile_cmd, capture_output=True, text=True, timeout=30
                )
                if result.returncode != 0:
                    return {"verdict": Verdict.COMPILATION_ERROR,
                            "error":   result.stderr[:500]}

            # Execute step
            run_cmd = [c.format(file=filename, binary=binary, dir=tmpdir)
                       for c in config["run"]]

            start = time.monotonic()
            try:
                proc = subprocess.run(
                    run_cmd,
                    input=input_data,
                    capture_output=True,
                    text=True,
                    timeout=self.time_limit + 0.5,  # small buffer
                    preexec_fn=self._set_limits
                )
                elapsed_ms = (time.monotonic() - start) * 1000

            except subprocess.TimeoutExpired:
                return {"verdict": Verdict.TIME_LIMIT_EXCEEDED,
                        "runtime_ms": self.time_limit * 1000}

            if proc.returncode != 0:
                return {"verdict": Verdict.RUNTIME_ERROR,
                        "error":   proc.stderr[:200]}

            actual = proc.stdout.strip()
            if actual == expected.strip():
                return {"verdict": Verdict.ACCEPTED, "runtime_ms": elapsed_ms}
            return {"verdict": Verdict.WRONG_ANSWER,
                    "runtime_ms": elapsed_ms,
                    "actual":     actual[:200]}

    def _set_limits(self):
        """Called in child process before exec."""
        resource.setrlimit(resource.RLIMIT_AS,
                           (self.mem_limit, self.mem_limit))
        resource.setrlimit(resource.RLIMIT_CPU,
                           (int(self.time_limit) + 1,
                            int(self.time_limit) + 1))

Judge Service

class JudgeService:
    def __init__(self):
        self.problems:    dict[str, Problem]    = {}
        self.submissions: dict[str, Submission] = {}
        self.executor     = SandboxedExecutor()

    def add_problem(self, problem: Problem) -> None:
        self.problems[problem.problem_id] = problem

    def submit(self, problem_id: str, user_id: str,
               language: str, code: str) -> Submission:
        problem = self.problems.get(problem_id)
        if not problem:
            raise ValueError(f"Problem {problem_id} not found")

        sub = Submission(
            submission_id = str(uuid.uuid4())[:8],
            problem_id    = problem_id,
            user_id       = user_id,
            language      = language,
            code          = code
        )
        self.submissions[sub.submission_id] = sub

        executor = SandboxedExecutor(
            time_limit_s = problem.time_limit_s,
            mem_limit_mb = problem.mem_limit_mb
        )

        # Run against all test cases; stop at first failure
        for tc in problem.test_cases:
            result = executor.run_test(code, language, tc.input_data, tc.expected_output)
            sub.verdict    = result["verdict"]
            sub.runtime_ms = result.get("runtime_ms", 0)
            sub.error_message = result.get("error", result.get("actual", ""))

            if sub.verdict != Verdict.ACCEPTED:
                break   # stop at first failing test case

        return sub

    def get_submission(self, submission_id: str) -> Submission | None:
        return self.submissions.get(submission_id)

Scaling Architecture

  • Async execution: submissions are queued to Kafka. Judge worker pool pulls from the queue and executes in parallel. Results written to a database; clients poll or receive WebSocket push.
  • Containerized sandbox: each submission runs in a Docker container with cgroup limits (CPU, memory, disk I/O) instead of setrlimit. This isolates submissions from each other and from the host.
  • Worker pool auto-scaling: submission rate drives worker count. During contests, scale to hundreds of workers.
  • Test case storage: test cases stored in S3; workers fetch on demand and cache locally.

Security Considerations

  • Process isolation: containers with network disabled, read-only filesystem, no root.
  • Output size limit: truncate stdout at 64 KB to prevent memory exhaustion via print.
  • Syscall filtering: seccomp profile blocks dangerous syscalls (fork-bomb prevention, socket creation).
  • Separate judge network: workers on an isolated subnet with no internet access.

Interview Extensions

How would you prevent fork bombs and infinite loops?

Fork bombs: set RLIMIT_NPROC to limit child processes. Infinite loops: enforce CPU time limit via RLIMIT_CPU (kills process if it exceeds N CPU-seconds) and wall-clock timeout via subprocess.run(timeout=). Both limits are applied in the child process before exec via preexec_fn=_set_limits.

How do you handle special judges (problems with multiple valid outputs)?

Instead of string comparison, run a custom checker program that takes (input, expected_output, actual_output) as arguments and returns 0 for accepted, non-zero for wrong. The JudgeService calls the checker executable instead of comparing strings. This supports floating-point tolerance, multiple correct answers, and interactive problems.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What are the main verdicts in an online code judge?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Accepted (AC): all test cases pass. Wrong Answer (WA): output does not match expected. Time Limit Exceeded (TLE): process exceeds wall-clock or CPU time limit. Memory Limit Exceeded (MLE): process exceeds memory limit. Runtime Error (RE): non-zero exit code (segfault, exception). Compilation Error (CE): code fails to compile. Each verdict terminates testing at the first failure (short-circuit evaluation).”
}
},
{
“@type”: “Question”,
“name”: “How do you sandbox code execution in an online judge?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Two levels: (1) OS-level: use setrlimit() to set CPU time (RLIMIT_CPU), virtual memory (RLIMIT_AS), process count (RLIMIT_NPROC), and file size limits before exec. (2) Container-level: run each submission in a Docker container with cgroup limits, disabled network, read-only filesystem, and a seccomp syscall filter blocking dangerous calls (socket, fork beyond limit). Container isolation is stronger but has higher startup latency (~500ms).”
}
},
{
“@type”: “Question”,
“name”: “How do you scale an online judge to handle contest traffic spikes?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Decouple submission ingestion from execution: API accepts submissions and enqueues to Kafka. Worker pool pulls from Kafka and executes. Workers are stateless and horizontally scalable. During contests, auto-scale workers based on queue depth. Use pre-warmed containers to reduce startup latency. Test cases are cached locally on workers from S3 to avoid repeated downloads.”
}
},
{
“@type”: “Question”,
“name”: “How do you handle problems with multiple valid outputs (special judge)?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Instead of string comparison, run a custom checker program that receives (input, expected_output, actual_output) and returns 0 for accepted or non-zero for wrong answer. The JudgeService calls the checker binary with these three arguments instead of comparing strings directly. This supports: floating-point answers within tolerance, multiple correct orderings, shortest-path problems with multiple optimal paths.”
}
},
{
“@type”: “Question”,
“name”: “What is a fork bomb and how do you prevent it in a code judge?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A fork bomb exploits unlimited process creation: import os; [os.fork() for _ in range(999)]. This creates 2^999 processes, exhausting system resources. Prevention: set RLIMIT_NPROC to limit child processes (e.g., 64). In container mode, set pids.max in the cgroup. Additionally, run submissions in a seccomp profile that restricts or counts fork/clone syscalls.”
}
}
]
}

Asked at: Databricks Interview Guide

Asked at: Coinbase Interview Guide

Asked at: Atlassian Interview Guide

Asked at: Stripe Interview Guide

Scroll to Top