Skip to content

Commit f156481

Browse files
Implement plural mcp functionality (#1408)
1 parent fd8015a commit f156481

23 files changed

+481
-3
lines changed

apps/api/lib/api/application.ex

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Api.Application do
88
ApiWeb.Plugs.MetricsExporter.setup()
99
children = [
1010
ApiWeb.Endpoint,
11+
{Bandit, plug: Core.MCP.Router, port: 3000},
1112
{FT.K8S.TrafficDrainHandler, Core.drain_config()},
1213
{Cluster.Supervisor, [topologies, [name: Api.ClusterSupervisor]]},
1314
]

apps/core/lib/core/mcp/jwt.ex

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
defmodule Core.MCP.Jwt do
2+
use Joken.Config
3+
use Nebulex.Caching
4+
5+
@alg "ES256"
6+
@ttl :timer.minutes(15)
7+
8+
def exchange(jwt) do
9+
with {:ok, %{"keys" => [jws | _]}} <- get_jwks(),
10+
do: verify_and_validate(jwt, signer(jws))
11+
end
12+
13+
def signer(jws), do: Joken.Signer.create(@alg, jws)
14+
15+
@decorate cacheable(cache: Core.Cache, key: :console_jws, opts: [ttl: @ttl])
16+
defp get_jwks() do
17+
Path.join([Core.conf(:console_url), "mcp", ".well-known", "jwks.json"])
18+
|> HTTPoison.get()
19+
|> case do
20+
{:ok, %HTTPoison.Response{status_code: 200, body: body}} -> Jason.decode(body)
21+
_ -> {:error, "failed to fetch jwks"}
22+
end
23+
end
24+
end

apps/core/lib/core/mcp/router.ex

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
defmodule Core.MCP.Router do
2+
use Plug.Router
3+
alias Plug.Conn
4+
alias Core.MCP.Jwt
5+
6+
plug Plug.Parsers,
7+
parsers: [:urlencoded, :json],
8+
pass: ["text/*"],
9+
json_decoder: Jason
10+
11+
plug :match
12+
plug :ensure_session_id
13+
plug :authorize
14+
plug :dispatch
15+
16+
forward "/sse", to: SSE.ConnectionPlug
17+
forward "/message", to: SSE.ConnectionPlug
18+
19+
match _ do
20+
send_resp(conn, 404, "Not found")
21+
end
22+
23+
# Middleware to ensure session ID exists
24+
def ensure_session_id(%Conn{query_params: %{"sessionId" => id}} = conn, _) when is_binary(id),
25+
do: conn
26+
def ensure_session_id(conn, _opts) do
27+
session_id = Base.encode16(:crypto.strong_rand_bytes(8), case: :lower)
28+
put_in(conn.query_params["sessionId"], session_id)
29+
end
30+
31+
def authorize(conn, _) do
32+
with ["Bearer " <> token | _] <- get_req_header(conn, "authorization"),
33+
true <- authorized?(token) do
34+
conn
35+
else
36+
_ ->
37+
send_resp(conn, 403, "unauthorized")
38+
|> halt()
39+
end
40+
end
41+
42+
@groups ~w(sales ops)
43+
44+
defp authorized?(token) do
45+
case Jwt.exchange(token) do
46+
{:ok, %{"admin" => true}} -> true
47+
{:ok, %{"groups" => groups}} -> Enum.any?(groups, & &1 in @groups)
48+
_ -> false
49+
end
50+
end
51+
end

apps/core/lib/core/mcp/server.ex

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
defmodule Core.MCP.Server do
2+
use MCPServer
3+
alias Core.MCP.Tools.{
4+
Account,
5+
Enterprise,
6+
CloudConsole,
7+
RemoveEnterprise
8+
}
9+
10+
@tools [Account, Enterprise, CloudConsole, RemoveEnterprise]
11+
@by_name Map.new(@tools, & {&1.name(), &1})
12+
13+
@protocol_version "2024-11-05"
14+
15+
@impl true
16+
def handle_ping(request_id) do
17+
{:ok, %{jsonrpc: "2.0", id: request_id, result: %{}}}
18+
end
19+
20+
@impl true
21+
def handle_initialize(request_id, params) do
22+
case validate_protocol_version(params["protocolVersion"]) do
23+
:ok ->
24+
{:ok,
25+
%{
26+
jsonrpc: "2.0",
27+
id: request_id,
28+
result: %{
29+
protocolVersion: @protocol_version,
30+
capabilities: %{
31+
tools: %{
32+
listChanged: true
33+
}
34+
},
35+
serverInfo: %{
36+
name: "Plural MCP Server",
37+
version: "0.1.0"
38+
}
39+
}
40+
}}
41+
42+
{:error, reason} ->
43+
{:error, reason}
44+
end
45+
end
46+
47+
@impl true
48+
def handle_list_tools(request_id, _params) do
49+
tools = Enum.map(@tools, & %{
50+
name: &1.name(),
51+
description: &1.description(),
52+
inputSchema: &1.schema()
53+
})
54+
{:ok, %{jsonrpc: "2.0", id: request_id, result: %{tools: tools}}}
55+
end
56+
57+
@impl true
58+
def handle_call_tool(request_id, %{"name" => name} = params) do
59+
with {:tool, t} when not is_nil(t) <- {:tool, @by_name[name]},
60+
{:ok, result} <- t.invoke(params["arguments"]) do
61+
{:ok, %{jsonrpc: "2.0", id: request_id, result: %{content: [%{type: "text", text: result}]}}}
62+
else
63+
{:tool, _} ->
64+
{:error, %{
65+
jsonrpc: "2.0",
66+
id: request_id,
67+
error: %{
68+
code: -32601,
69+
message: "Method not found",
70+
data: %{
71+
name: "no tool with name #{name}"
72+
}
73+
}
74+
}}
75+
{:error, err} ->
76+
{:ok, %{jsonrpc: "2.0", id: request_id, result: %{isError: true, content: [%{type: "text", text: err}]}}}
77+
end
78+
end
79+
end

apps/core/lib/core/mcp/tool.ex

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
defmodule Core.MCP.Tool do
2+
@doc """
3+
The mcp server tool description
4+
"""
5+
@callback description() :: binary
6+
7+
@doc """
8+
jsonapi format input schema
9+
"""
10+
@callback schema() :: map
11+
12+
@doc """
13+
name of the tool
14+
"""
15+
@callback name() :: binary
16+
17+
@doc """
18+
actually calls the tool
19+
"""
20+
@callback invoke(args :: map) :: {:ok, binary} | {:error, binary}
21+
end
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
defmodule Core.MCP.Tools.Account do
2+
@behaviour Core.MCP.Tool
3+
alias Core.Repo
4+
alias Core.Services.Users
5+
alias Core.Schema.User
6+
7+
def name(), do: "account"
8+
9+
def description(), do: "Fetches the full Plural account given a user email, also including subscription and plan details"
10+
11+
def schema() do
12+
%{
13+
type: "object",
14+
required: ["email"],
15+
properties: %{
16+
email: %{
17+
type: "string",
18+
description: "The email of the user whose account you want to look up"
19+
}
20+
}
21+
}
22+
end
23+
24+
def invoke(%{"email" => email}) do
25+
with %User{} = user <- Users.get_user_by_email(email) do
26+
user = Repo.preload(user, [account: [subscription: :plan]])
27+
Jason.encode(user)
28+
else
29+
_ -> {:ok, "no user with email #{email}"}
30+
end
31+
end
32+
def invoke(_), do: {:error, "email is required"}
33+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
defmodule Core.MCP.Tools.CloudConsole do
2+
@behaviour Core.MCP.Tool
3+
alias Core.Repo
4+
alias Core.Services.Cloud
5+
alias Core.Schema.{ConsoleInstance}
6+
7+
def name(), do: "console_cluster"
8+
9+
def description(), do: "Fetches the details of a cloud console"
10+
11+
def schema() do
12+
%{
13+
type: "object",
14+
required: ["name"],
15+
properties: %{
16+
name: %{
17+
type: "string",
18+
description: "The name of the console instance"
19+
}
20+
}
21+
}
22+
end
23+
24+
def invoke(%{"name" => name}) do
25+
with %ConsoleInstance{} = console <- Cloud.get_instance_by_name(name),
26+
console = Repo.preload(console, [:cluster, owner: [account: [subscription: :plan]]]) do
27+
Jason.encode(format(console))
28+
else
29+
_ -> {:ok, "no instance with name #{name}"}
30+
end
31+
end
32+
def invoke(_), do: {:error, "email is required"}
33+
34+
defp format(console) do
35+
Map.take(console, ~w(id type name status subdomain url cloud size region cluster owner)a)
36+
end
37+
end
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
defmodule Core.MCP.Tools.Enterprise do
2+
@behaviour Core.MCP.Tool
3+
alias Core.Services.{Accounts, Payments}
4+
alias Core.Schema.Account
5+
6+
def name(), do: "add_enterprise_plan"
7+
8+
def description(), do: "Adds an account to the main enterprise plan"
9+
10+
def schema() do
11+
%{
12+
type: "object",
13+
required: ["account_id"],
14+
properties: %{
15+
account_id: %{
16+
type: "string",
17+
description: "The account id to add to the enterprise plan. You can fetch the id by first fetching the users account info."
18+
}
19+
}
20+
}
21+
end
22+
23+
def invoke(%{"account_id" => id}) do
24+
with %Account{} = account <- Accounts.get_account(id),
25+
{:ok, _} <- Payments.remove_trial(account),
26+
{:ok, _} <- Payments.setup_enterprise_plan(account.id) do
27+
{:ok, "added the account to the enterprise plan successfully"}
28+
else
29+
_ -> {:ok, "no account with id #{id}"}
30+
end
31+
end
32+
def invoke(_), do: {:error, "account id is required"}
33+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
defmodule Core.MCP.Tools.RemoveEnterprise do
2+
@behaviour Core.MCP.Tool
3+
alias Core.Services.{Accounts, Payments}
4+
alias Core.Schema.Account
5+
6+
def name(), do: "remove_enterprise_plan"
7+
8+
def description(), do: "removes an account to the main enterprise plan"
9+
10+
def schema() do
11+
%{
12+
type: "object",
13+
required: ["account_id"],
14+
properties: %{
15+
account_id: %{
16+
type: "string",
17+
description: "The account id to add to the enterprise plan. You can fetch the id by first querying a users account"
18+
}
19+
}
20+
}
21+
end
22+
23+
def invoke(%{"account_id" => id}) do
24+
with {:account, %Account{} = account} <- {:account, Accounts.get_account(id)},
25+
{:ok, _} <- Payments.remove_enterprise_plan(account.id) do
26+
{:ok, "removed the account from the enterprise plan successfully"}
27+
else
28+
{:account, _} -> {:ok, "no account with id #{id}"}
29+
err -> err
30+
end
31+
end
32+
33+
def invoke(_), do: {:error, "account id is required"}
34+
end

apps/core/lib/core/services/accounts.ex

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ defmodule Core.Services.Accounts do
3232

3333
def get_account!(id), do: Core.Repo.get!(Account, id)
3434

35+
def get_account(id), do: Core.Repo.get(Account, id)
36+
3537
def get_group!(id), do: Core.Repo.get!(Group, id)
3638

3739
def get_group_by_name(aid, name), do: Core.Repo.get_by(Group, name: name, account_id: aid)

apps/core/lib/core/services/payments.ex

+13-2
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,7 @@ defmodule Core.Services.Payments do
675675
defp discount(amount, :yearly), do: round(9 * amount / 10) * 12
676676
defp discount(amount, _), do: amount
677677

678+
@spec setup_enterprise_plan(User.t | binary) :: platform_sub_resp
678679
def setup_enterprise_plan(%User{} = user) do
679680
case Repo.preload(user, [account: [subscription: :plan]]) do
680681
%{account: %Account{} = account} ->
@@ -683,15 +684,25 @@ defmodule Core.Services.Payments do
683684
_ -> setup_enterprise_plan(user.account_id)
684685
end
685686
end
686-
687-
def setup_enterprise_plan(account_id) do
687+
def setup_enterprise_plan(account_id) when is_binary(account_id) do
688688
plan = get_platform_plan_by_name!("Enterprise")
689689

690690
%PlatformSubscription{account_id: account_id}
691691
|> PlatformSubscription.changeset(%{plan_id: plan.id})
692692
|> Core.Repo.insert()
693693
end
694694

695+
@spec remove_enterprise_plan(binary) :: platform_sub_resp
696+
def remove_enterprise_plan(account_id) do
697+
Repo.get_by(PlatformSubscription, account_id: account_id)
698+
|> Repo.preload([:plan])
699+
|> case do
700+
%PlatformSubscription{plan: %PlatformPlan{enterprise: true}} = sub ->
701+
Repo.delete(sub)
702+
_ -> {:error, "account #{account_id} is not on an enterprise plan"}
703+
end
704+
end
705+
695706
@doc """
696707
Creates a plan and associated add-ons. Transactionally creates them in stripe, then
697708
restitches them back into the db.

apps/core/mix.exs

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ defmodule Core.MixProject do
7676
{:bamboo, "~> 2.0"},
7777
{:parallax, "~> 1.0"},
7878
{:bourne, "~> 1.1"},
79+
{:mcp_sse, "~> 0.1.0"},
7980
{:flow, "~> 0.15.0"},
8081
{:joken, "~> 2.5.0"},
8182
{:guardian, "~> 1.2.1"},

0 commit comments

Comments
 (0)