mirror of
https://github.com/bitwarden/browser
synced 2025-12-22 03:03:43 +00:00
[PM-7846] Implement a rust based native messaging proxy and IPC system (#9894)
* [PM-7846] Implement a rust based native messaging proxy and IPC system
* Only build desktop_proxy
* Bundle the desktop_proxy file
* Make sys deps optional for the proxy
* Restore accidentally deleted after-sign
* Update native cache to contain dist folder
* Add some test logging
* Native module cache seems very aggressive
* Fix invalid directory
* Fix debug print
* Remove cache force
* Remove cache debug code
* Only log to file in debug builds
* Place the binary in the correct place for mac and make sure it's signed
* Fix platform paths
* Test unsigned appx
* Revert "Test unsigned appx"
This reverts commit e47535440a.
* Fix comment
* Remove logs
* Use debug builds in native code, and test private path on MacOS
* Add connected message
* Update IPC API comments
* Update linux to also use XDG_ dir
* Update main.rs comment
* Improve docs and split some tasks spawned into separate functions
* Update send docs and return number of elements sent
* Mark `listen` as async to ensure it runs in a tokio context, handle errors better
* Add log on client channel closed
* Move binary to MacOS folder, and sign it manually so it gets the correct entitlements
* Fix some review comments
* Run prettier
* Added missing zbus_polkit dep
* Extract magic number and increase it to match spec
* Comment fix
* Use Napi object, combine nativeBinding export, always log to file
* Missed one comment
* Remove unnecessary generics
* Correct comment
* Select only codesigning identities
* Filter certificates
* Also add local dev cert
* Remove log
* Fix package ID
* debug_assert won't run the pop() in release mode
* Better error messages
* Fix review comments
* Remove unnecessary comment
* Update napi generated TS file
* Temporary fix for DDG
This commit is contained in:
102
apps/desktop/desktop_native/core/src/ipc/client.rs
Normal file
102
apps/desktop/desktop_native/core/src/ipc/client.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use interprocess::local_socket::{
|
||||
tokio::{prelude::*, Stream},
|
||||
GenericFilePath, ToFsName,
|
||||
};
|
||||
use log::{error, info};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
use crate::ipc::NATIVE_MESSAGING_BUFFER_SIZE;
|
||||
|
||||
pub async fn connect(
|
||||
path: PathBuf,
|
||||
send: tokio::sync::mpsc::Sender<String>,
|
||||
mut recv: tokio::sync::mpsc::Receiver<String>,
|
||||
) {
|
||||
// Keep track of connection failures to make sure we don't leave the process as a zombie
|
||||
let mut connection_failures = 0;
|
||||
|
||||
loop {
|
||||
match connect_inner(&path, &send, &mut recv).await {
|
||||
Ok(()) => return,
|
||||
Err(e) => {
|
||||
connection_failures += 1;
|
||||
if connection_failures >= 20 {
|
||||
error!("Failed to connect to IPC server after 20 attempts: {e}");
|
||||
return;
|
||||
}
|
||||
|
||||
error!("Failed to connect to IPC server: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_inner(
|
||||
path: &Path,
|
||||
send: &tokio::sync::mpsc::Sender<String>,
|
||||
recv: &mut tokio::sync::mpsc::Receiver<String>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("Attempting to connect to {}", path.display());
|
||||
|
||||
let name = path.as_os_str().to_fs_name::<GenericFilePath>()?;
|
||||
let mut conn = Stream::connect(name).await?;
|
||||
|
||||
info!("Connected to {}", path.display());
|
||||
|
||||
// This `connected` and the latter `disconnected` messages are the only ones that
|
||||
// are sent from the Rust IPC code and not just forwarded from the desktop app.
|
||||
// As it's only two, we hardcode the JSON values to avoid pulling in a JSON library.
|
||||
send.send("{\"command\":\"connected\"}".to_owned()).await?;
|
||||
|
||||
let mut buffer = vec![0; NATIVE_MESSAGING_BUFFER_SIZE];
|
||||
|
||||
// Listen to IPC messages
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Forward messages to the IPC server
|
||||
msg = recv.recv() => {
|
||||
match msg {
|
||||
Some(msg) => {
|
||||
conn.write_all(msg.as_bytes()).await?;
|
||||
}
|
||||
None => {
|
||||
info!("Client channel closed");
|
||||
break;
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
// Forward messages from the IPC server
|
||||
res = conn.read(&mut buffer[..]) => {
|
||||
match res {
|
||||
Err(e) => {
|
||||
error!("Error reading from IPC server: {e}");
|
||||
break;
|
||||
}
|
||||
Ok(0) => {
|
||||
info!("Connection closed");
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
let message = String::from_utf8_lossy(&buffer[..n]).to_string();
|
||||
send.send(message).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = send.send("{\"command\":\"disconnected\"}".to_owned()).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
64
apps/desktop/desktop_native/core/src/ipc/mod.rs
Normal file
64
apps/desktop/desktop_native/core/src/ipc/mod.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
pub mod client;
|
||||
pub mod server;
|
||||
|
||||
/// The maximum size of a message that can be sent over IPC.
|
||||
/// According to the documentation, the maximum size sent to the browser is 1MB.
|
||||
/// While the maximum size sent from the browser to the native messaging host is 4GB.
|
||||
///
|
||||
/// Currently we are setting the maximum both ways to be 1MB.
|
||||
///
|
||||
/// https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/Native_messaging#app_side
|
||||
/// https://developer.chrome.com/docs/extensions/develop/concepts/native-messaging#native-messaging-host-protocol
|
||||
pub const NATIVE_MESSAGING_BUFFER_SIZE: usize = 1024 * 1024;
|
||||
|
||||
/// The maximum number of messages that can be buffered in a channel.
|
||||
/// This number is more or less arbitrary and can be adjusted as needed,
|
||||
/// but ideally the messages should be processed as quickly as possible.
|
||||
pub const MESSAGE_CHANNEL_BUFFER: usize = 32;
|
||||
|
||||
/// Resolve the path to the IPC socket.
|
||||
pub fn path(name: &str) -> std::path::PathBuf {
|
||||
#[cfg(target_os = "windows")]
|
||||
{
|
||||
// Use a unique IPC pipe //./pipe/xxxxxxxxxxxxxxxxx.app.bitwarden per user.
|
||||
// Hashing prevents problems with reserved characters and file length limitations.
|
||||
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
|
||||
use sha2::Digest;
|
||||
let home = dirs::home_dir().unwrap();
|
||||
let hash = sha2::Sha256::digest(home.as_os_str().as_encoded_bytes());
|
||||
let hash_b64 = URL_SAFE_NO_PAD.encode(hash.as_slice());
|
||||
|
||||
format!(r"\\.\pipe\{hash_b64}.app.{name}").into()
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
let mut home = dirs::home_dir().unwrap();
|
||||
|
||||
// When running in an unsandboxed environment, path is: /Users/<user>/
|
||||
// While running sandboxed, it's different: /Users/<user>/Library/Containers/com.bitwarden.desktop/Data
|
||||
//
|
||||
// We want to use App Groups in /Users/<user>/Library/Group Containers/LTZ2PFU5D6.com.bitwarden.desktop,
|
||||
// so we need to remove all the components after the user.
|
||||
// Note that we subtract 3 because the root directory is counted as a component (/, Users, <user>).
|
||||
let num_components = home.components().count();
|
||||
for _ in 0..num_components - 3 {
|
||||
home.pop();
|
||||
}
|
||||
|
||||
home.join(format!(
|
||||
"Library/Group Containers/LTZ2PFU5D6.com.bitwarden.desktop/tmp/app.{name}"
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
// On Linux, we use the user's cache directory.
|
||||
let home = dirs::cache_dir().unwrap();
|
||||
let path_dir = home.join("com.bitwarden.desktop");
|
||||
|
||||
// The chache directory might not exist, so create it
|
||||
let _ = std::fs::create_dir_all(&path_dir);
|
||||
path_dir.join(format!("app.{name}"))
|
||||
}
|
||||
}
|
||||
232
apps/desktop/desktop_native/core/src/ipc/server.rs
Normal file
232
apps/desktop/desktop_native/core/src/ipc/server.rs
Normal file
@@ -0,0 +1,232 @@
|
||||
use std::{error::Error, path::Path, vec};
|
||||
|
||||
use futures::TryFutureExt;
|
||||
|
||||
use anyhow::Result;
|
||||
use interprocess::local_socket::{tokio::prelude::*, GenericFilePath, ListenerOptions};
|
||||
use log::{error, info};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
|
||||
sync::{broadcast, mpsc},
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::{MESSAGE_CHANNEL_BUFFER, NATIVE_MESSAGING_BUFFER_SIZE};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Message {
|
||||
pub client_id: u32,
|
||||
pub kind: MessageType,
|
||||
// This value should be Some for MessageType::Message and None for the rest
|
||||
pub message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MessageType {
|
||||
Connected,
|
||||
Disconnected,
|
||||
Message,
|
||||
}
|
||||
|
||||
pub struct Server {
|
||||
cancel_token: CancellationToken,
|
||||
server_to_clients_send: broadcast::Sender<String>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
/// Create and start the IPC server without blocking.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
/// - `name`: The endpoint name to listen on. This name uniquely identifies the IPC connection and must be the same for both the server and client.
|
||||
/// - `client_to_server_send`: This [`mpsc::Sender<Message>`] will receive all the [`Message`]'s that the clients send to this server.
|
||||
pub fn start(
|
||||
path: &Path,
|
||||
client_to_server_send: mpsc::Sender<Message>,
|
||||
) -> Result<Self, Box<dyn Error>> {
|
||||
// If the unix socket file already exists, we get an error when trying to bind to it. So we remove it first.
|
||||
// Any processes that were using the old socket should remain connected to it but any new connections will use the new socket.
|
||||
if !cfg!(windows) {
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
let name = path.as_os_str().to_fs_name::<GenericFilePath>()?;
|
||||
let opts = ListenerOptions::new().name(name);
|
||||
let listener = opts.create_tokio()?;
|
||||
|
||||
// This broadcast channel is used for sending messages to all connected clients, and so the sender
|
||||
// will be stored in the server while the receiver will be cloned and passed to each client handler.
|
||||
let (server_to_clients_send, server_to_clients_recv) =
|
||||
broadcast::channel::<String>(MESSAGE_CHANNEL_BUFFER);
|
||||
|
||||
// This cancellation token allows us to cleanly stop the server and all the spawned
|
||||
// tasks without having to wait on all the pending tasks finalizing first
|
||||
let cancel_token = CancellationToken::new();
|
||||
|
||||
// Create the server and start listening for incoming connections
|
||||
// in a separate task to avoid blocking the current task
|
||||
let server = Server {
|
||||
cancel_token: cancel_token.clone(),
|
||||
server_to_clients_send,
|
||||
};
|
||||
tokio::spawn(listen_incoming(
|
||||
listener,
|
||||
client_to_server_send,
|
||||
server_to_clients_recv,
|
||||
cancel_token,
|
||||
));
|
||||
|
||||
Ok(server)
|
||||
}
|
||||
|
||||
/// Send a message over the IPC server to all the connected clients
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// The number of clients that the message was sent to. Note that the number of messages
|
||||
/// sent may be less than the number of connected clients if some clients disconnect while
|
||||
/// the message is being sent.
|
||||
pub fn send(&self, message: String) -> Result<usize> {
|
||||
let sent = self.server_to_clients_send.send(message)?;
|
||||
Ok(sent)
|
||||
}
|
||||
|
||||
/// Stop the IPC server.
|
||||
pub fn stop(&self) {
|
||||
self.cancel_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Server {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
async fn listen_incoming(
|
||||
listener: LocalSocketListener,
|
||||
client_to_server_send: mpsc::Sender<Message>,
|
||||
server_to_clients_recv: broadcast::Receiver<String>,
|
||||
cancel_token: CancellationToken,
|
||||
) {
|
||||
// We use a simple incrementing ID for each client
|
||||
let mut next_client_id = 1_u32;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel_token.cancelled() => {
|
||||
info!("IPC server cancelled.");
|
||||
break;
|
||||
},
|
||||
|
||||
// A new client connection has been established
|
||||
msg = listener.accept() => {
|
||||
match msg {
|
||||
Ok(client_stream) => {
|
||||
let client_id = next_client_id;
|
||||
next_client_id += 1;
|
||||
|
||||
let future = handle_connection(
|
||||
client_stream,
|
||||
client_to_server_send.clone(),
|
||||
// We resubscribe to the receiver here so this task can have it's own copy
|
||||
// Note that this copy will only receive messages sent after this point,
|
||||
// but that is okay, realistically we don't want any messages before we get a chance
|
||||
// to send the connected message to the client, which is done inside [`handle_connection`]
|
||||
server_to_clients_recv.resubscribe(),
|
||||
cancel_token.clone(),
|
||||
client_id
|
||||
);
|
||||
tokio::spawn(future.map_err(|e| {
|
||||
error!("Error handling connection: {}", e)
|
||||
}));
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Error accepting connection: {}", e);
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
mut client_stream: impl AsyncRead + AsyncWrite + Unpin,
|
||||
client_to_server_send: mpsc::Sender<Message>,
|
||||
mut server_to_clients_recv: broadcast::Receiver<String>,
|
||||
cancel_token: CancellationToken,
|
||||
client_id: u32,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
client_to_server_send
|
||||
.send(Message {
|
||||
client_id,
|
||||
kind: MessageType::Connected,
|
||||
message: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut buf = vec![0u8; NATIVE_MESSAGING_BUFFER_SIZE];
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel_token.cancelled() => {
|
||||
info!("Client {client_id} cancelled.");
|
||||
break;
|
||||
},
|
||||
|
||||
// Forward messages to the IPC clients
|
||||
msg = server_to_clients_recv.recv() => {
|
||||
match msg {
|
||||
Ok(msg) => {
|
||||
client_stream.write_all(msg.as_bytes()).await?;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Error reading message: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Forwards messages from the IPC clients to the server
|
||||
// Note that we also send connect and disconnect events so that
|
||||
// the server can keep track of multiple clients
|
||||
result = client_stream.read(&mut buf) => {
|
||||
match result {
|
||||
Err(e) => {
|
||||
info!("Error reading from client {client_id}: {e}");
|
||||
|
||||
client_to_server_send.send(Message {
|
||||
client_id,
|
||||
kind: MessageType::Disconnected,
|
||||
message: None,
|
||||
}).await?;
|
||||
break;
|
||||
},
|
||||
Ok(0) => {
|
||||
info!("Client {client_id} disconnected.");
|
||||
|
||||
client_to_server_send.send(Message {
|
||||
client_id,
|
||||
kind: MessageType::Disconnected,
|
||||
message: None,
|
||||
}).await?;
|
||||
break;
|
||||
},
|
||||
Ok(size) => {
|
||||
let msg = std::str::from_utf8(&buf[..size])?;
|
||||
|
||||
client_to_server_send.send(Message {
|
||||
client_id,
|
||||
kind: MessageType::Message,
|
||||
message: Some(msg.to_string()),
|
||||
}).await?;
|
||||
},
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,7 +1,13 @@
|
||||
#[cfg(feature = "sys")]
|
||||
pub mod biometric;
|
||||
#[cfg(feature = "sys")]
|
||||
pub mod clipboard;
|
||||
pub mod crypto;
|
||||
pub mod error;
|
||||
pub mod ipc;
|
||||
#[cfg(feature = "sys")]
|
||||
pub mod password;
|
||||
#[cfg(feature = "sys")]
|
||||
pub mod process_isolation;
|
||||
#[cfg(feature = "sys")]
|
||||
pub mod powermonitor;
|
||||
|
||||
Reference in New Issue
Block a user