1
2
3
4
5
6
7
8import { randomUUID } from 'node:crypto';
9
10import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js';
11import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
12import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
13import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
14import { log } from 'apify';
15import type { Request, Response } from 'express';
16import express from 'express';
17
18import { chargeMessageRequest } from './billing.js';
19import { getMcpServer as getMCPServerWithCommand } from './mcp.js';
20
21let getMcpServer: null | (() => Promise<McpServer>) = null;
22
23
24const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
25
26
27
28
29
30
31
32async function mcpPostHandler(req: Request, res: Response) {
33
34 if (!getMcpServer) {
35 res.status(500).json({
36 jsonrpc: '2.0',
37 error: {
38 code: -32000,
39 message: 'Server not initialized',
40 },
41 id: null,
42 });
43 return;
44 }
45 const sessionId = req.headers['mcp-session-id'] as string | undefined;
46 log.info('Received MCP request', {
47 sessionId: sessionId || null,
48 body: req.body,
49 });
50 try {
51 let transport: StreamableHTTPServerTransport;
52 if (sessionId && transports[sessionId]) {
53
54 transport = transports[sessionId] as StreamableHTTPServerTransport;
55 } else if (!sessionId && isInitializeRequest(req.body)) {
56
57 const eventStore = new InMemoryEventStore();
58 transport = new StreamableHTTPServerTransport({
59 sessionIdGenerator: () => randomUUID(),
60 eventStore,
61 onsessioninitialized: (initializedSessionId) => {
62
63
64 log.info('Session initialized', {
65 sessionId: initializedSessionId,
66 });
67 transports[initializedSessionId] = transport;
68 },
69 });
70
71
72 transport.onmessage = (message) => {
73 chargeMessageRequest(message as { method: string }).catch((error) => {
74 log.error('Error charging for message request:', {
75 error,
76 sessionId: transport.sessionId || null,
77 });
78 });
79 };
80
81
82 transport.onclose = () => {
83 const sid = transport.sessionId;
84 if (sid && transports[sid]) {
85 log.info('Transport closed', {
86 sessionId: sid,
87 });
88 delete transports[sid];
89 }
90 };
91
92
93
94 const server = await getMcpServer();
95 await server.connect(transport);
96
97 await transport.handleRequest(req, res, req.body);
98 return;
99 } else {
100
101 res.status(400).json({
102 jsonrpc: '2.0',
103 error: {
104 code: -32000,
105 message: 'Bad Request: No valid session ID provided',
106 },
107 id: null,
108 });
109 return;
110 }
111
112
113
114 await transport.handleRequest(req, res, req.body);
115 } catch (error) {
116 log.error('Error handling MCP request:', {
117 error,
118 sessionId: sessionId || null,
119 });
120 if (!res.headersSent) {
121 res.status(500).json({
122 jsonrpc: '2.0',
123 error: {
124 code: -32603,
125 message: 'Internal server error',
126 },
127 id: null,
128 });
129 }
130 }
131}
132
133
134
135
136
137async function mcpGetHandler(req: Request, res: Response) {
138 const sessionId = req.headers['mcp-session-id'] as string | undefined;
139 if (!sessionId || !transports[sessionId]) {
140 res.status(400).send('Invalid or missing session ID');
141 return;
142 }
143
144
145 const lastEventId = req.headers['last-event-id'] as string | undefined;
146 if (lastEventId) {
147 log.info('Client reconnecting', {
148 lastEventId: lastEventId || null,
149 });
150 } else {
151 log.info('Establishing new SSE stream', {
152 sessionId: sessionId || null,
153 });
154 }
155
156 const transport = transports[sessionId] as StreamableHTTPServerTransport;
157 await transport.handleRequest(req, res);
158}
159
160
161
162
163
164async function mcpDeleteHandler(req: Request, res: Response) {
165 const sessionId = req.headers['mcp-session-id'] as string | undefined;
166 if (!sessionId || !transports[sessionId]) {
167 res.status(400).send('Invalid or missing session ID');
168 return;
169 }
170
171 log.info('Received session termination request', {
172 sessionId: sessionId || null,
173 });
174
175 try {
176 const transport = transports[sessionId] as StreamableHTTPServerTransport;
177 await transport.handleRequest(req, res);
178 } catch (error) {
179 log.error('Error handling session termination:', {
180 error,
181 });
182 if (!res.headersSent) {
183 res.status(500).send('Error processing session termination');
184 }
185 }
186}
187
188
189
190
191
192
193
194export async function startServer(options: { serverPort: number; command: string[] }) {
195 log.info('Starting MCP HTTP Server', {
196 serverPort: options.serverPort,
197 command: options.command,
198 });
199 const { serverPort, command } = options;
200
201 getMcpServer = async () => getMCPServerWithCommand(command);
202
203 const app = express();
204
205
206 app.get('/favicon.ico', (_req: Request, res: Response) => {
207 res.writeHead(301, { Location: 'https://apify.com/favicon.ico' });
208 res.end();
209 });
210
211
212 app.get('/', (req: Request, res: Response) => {
213 if (req.headers['x-apify-container-server-readiness-probe']) {
214 console.log('Readiness probe');
215 res.end('ok\n');
216 return;
217 }
218 res.status(404).end();
219 });
220
221
222 app.get('/.well-known/oauth-authorization-server', async (_req: Request, res: Response) => {
223
224 const response = await fetch(`https://api.apify.com/.well-known/oauth-authorization-server`);
225 const data = await response.json();
226 res.status(200).json(data);
227 });
228
229 app.use(express.json());
230
231
232 app.post('/mcp', mcpPostHandler);
233 app.get('/mcp', mcpGetHandler);
234 app.delete('/mcp', mcpDeleteHandler);
235
236 app.listen(serverPort, () => {
237 log.info(`MCP HTTP Server listening on port ${serverPort}`);
238 });
239
240
241 process.on('SIGINT', async () => {
242 log.info('Shutting down server...');
243
244
245 for (const sessionId of Object.keys(transports)) {
246 try {
247 log.info(`Closing transport for session ${sessionId}`);
248 await transports[sessionId].close();
249 delete transports[sessionId];
250 } catch (error) {
251 log.error(`Error closing transport for session ${sessionId}:`, {
252 error,
253 });
254 }
255 }
256 log.info('Server shutdown complete');
257 process.exit(0);
258 });
259}