Forked from soumyajit7038/python-tools
Project Files
src / utils / safePaths.ts
import { lstat, realpath, stat } from "node:fs/promises";
import path from "node:path";
export const MAX_PATH_LENGTH = 500;
export const MAX_ARG_LENGTH = 500;
export const MAX_WINDOW_TITLE_LENGTH = 120;
export const MAX_ARG_COUNT = 20;
export const MAX_FILE_READ_BYTES = 300_000;
export const MAX_FILE_WRITE_BYTES = 500_000;
export const MAX_DIRECTORY_ITEMS = 500;
export const MAX_EDIT_REPLACEMENTS = 100;
const FORBIDDEN_WINDOW_TITLE_CHARS = /["&|;<>`$\n\r]/;
const WORKSPACE_ENV_VAR = "PYTHON_TOOLS_WORKSPACE";
export function validateNoNewline(value: string, fieldName: string): void {
if (value.includes("\n") || value.includes("\r")) {
throw new Error(`${fieldName} must not contain newline characters.`);
}
}
export function validateNoNewlines(value: string, fieldName: string): void {
validateNoNewline(value, fieldName);
}
export function validatePathInput(value: string, fieldName: string): void {
if (value.trim().length === 0) {
throw new Error(`${fieldName} is required.`);
}
if (value.length > MAX_PATH_LENGTH) {
throw new Error(`${fieldName} must be ${MAX_PATH_LENGTH} characters or fewer.`);
}
validateNoNewline(value, fieldName);
}
export async function validatePythonFilePath(filePath: string): Promise<string> {
validatePathInput(filePath, "filePath");
const absolutePath = path.resolve(filePath);
if (path.extname(absolutePath).toLowerCase() !== ".py") {
throw new Error("filePath must point to a .py file.");
}
const fileStats = await stat(absolutePath).catch(() => null);
if (fileStats === null) {
throw new Error(`Python file does not exist: ${absolutePath}`);
}
if (!fileStats.isFile()) {
throw new Error(`filePath is not a file: ${absolutePath}`);
}
return absolutePath;
}
export async function validateWorkingDirectory(cwd?: string): Promise<string | undefined> {
if (cwd === undefined) {
return undefined;
}
validatePathInput(cwd, "cwd");
const absoluteCwd = path.resolve(cwd);
const cwdStats = await stat(absoluteCwd).catch(() => null);
if (cwdStats === null) {
throw new Error(`Working directory does not exist: ${absoluteCwd}`);
}
if (!cwdStats.isDirectory()) {
throw new Error(`cwd is not a directory: ${absoluteCwd}`);
}
return absoluteCwd;
}
export function validateArgs(args?: string[]): string[] {
if (args === undefined) {
return [];
}
if (args.length > MAX_ARG_COUNT) {
throw new Error(`args must include at most ${MAX_ARG_COUNT} items.`);
}
for (const argument of args) {
if (argument.length > MAX_ARG_LENGTH) {
throw new Error(`Each arg must be ${MAX_ARG_LENGTH} characters or fewer.`);
}
validateNoNewline(argument, "args item");
}
return args;
}
export function validateWindowTitle(windowTitle?: string): string {
const resolvedTitle = windowTitle ?? "Python Tools";
if (resolvedTitle.trim().length === 0) {
throw new Error("windowTitle must not be empty.");
}
if (resolvedTitle.length > MAX_WINDOW_TITLE_LENGTH) {
throw new Error(`windowTitle must be ${MAX_WINDOW_TITLE_LENGTH} characters or fewer.`);
}
if (FORBIDDEN_WINDOW_TITLE_CHARS.test(resolvedTitle)) {
throw new Error("windowTitle contains forbidden characters.");
}
return resolvedTitle;
}
export function getWorkspaceRoot(): string {
const workspaceFromEnvironment = process.env[WORKSPACE_ENV_VAR]?.trim();
const workspaceRoot = workspaceFromEnvironment !== undefined && workspaceFromEnvironment.length > 0
? workspaceFromEnvironment
: process.cwd();
validatePathInput(workspaceRoot, WORKSPACE_ENV_VAR);
const absoluteWorkspaceRoot = path.resolve(workspaceRoot);
validatePathInput(absoluteWorkspaceRoot, "workspaceRoot");
return absoluteWorkspaceRoot;
}
export function resolveSafePath(inputPath: string, fieldName = "path"): string {
validatePathInput(inputPath, fieldName);
const workspaceRoot = getWorkspaceRoot();
const absolutePath = path.isAbsolute(inputPath)
? path.resolve(inputPath)
: path.resolve(workspaceRoot, inputPath);
validatePathInput(absolutePath, fieldName);
ensureInsideWorkspace(absolutePath, workspaceRoot, fieldName);
return absolutePath;
}
export async function ensurePythonFile(
filePath: string,
options: { mustExist?: boolean } = {},
): Promise<string> {
const resolvedPath = resolveSafePath(filePath, "filePath");
if (path.extname(resolvedPath).toLowerCase() !== ".py") {
throw new Error("filePath must point to a .py file.");
}
if (options.mustExist === false) {
return resolvedPath;
}
const fileStats = await stat(resolvedPath).catch(() => null);
if (fileStats === null) {
throw new Error(`Python file does not exist: ${resolvedPath}`);
}
if (!fileStats.isFile()) {
throw new Error(`filePath is not a file: ${resolvedPath}`);
}
const fileLstat = await lstat(resolvedPath).catch(() => null);
if (fileLstat?.isSymbolicLink()) {
throw new Error("Symbolic links are not allowed for filePath.");
}
await ensureCanonicalPathInsideWorkspace(resolvedPath, "filePath");
return resolvedPath;
}
export async function ensureDirectory(directoryPath: string): Promise<string> {
const resolvedPath = resolveSafePath(directoryPath, "directoryPath");
const directoryStats = await stat(resolvedPath).catch(() => null);
if (directoryStats === null) {
throw new Error(`Directory does not exist: ${resolvedPath}`);
}
if (!directoryStats.isDirectory()) {
throw new Error(`directoryPath is not a directory: ${resolvedPath}`);
}
const directoryLstat = await lstat(resolvedPath).catch(() => null);
if (directoryLstat?.isSymbolicLink()) {
throw new Error("Symbolic links are not allowed for directoryPath.");
}
await ensureCanonicalPathInsideWorkspace(resolvedPath, "directoryPath");
return resolvedPath;
}
export async function ensureCanonicalPathInsideWorkspace(pathToCheck: string, fieldName: string): Promise<void> {
const workspaceRoot = getWorkspaceRoot();
const canonicalWorkspaceRoot = await resolveCanonicalPath(workspaceRoot);
const canonicalPathToCheck = await resolveCanonicalPath(pathToCheck);
ensureInsideWorkspace(canonicalPathToCheck, canonicalWorkspaceRoot, fieldName);
}
function ensureInsideWorkspace(absolutePath: string, workspaceRoot: string, fieldName: string): void {
const relativePath = path.relative(workspaceRoot, absolutePath);
if (relativePath === "") {
return;
}
if (relativePath.startsWith("..") || path.isAbsolute(relativePath)) {
throw new Error(`${fieldName} must stay inside the configured workspace root: ${workspaceRoot}`);
}
}
async function resolveCanonicalPath(inputPath: string): Promise<string> {
return await realpath(inputPath).catch(() => path.resolve(inputPath));
}