mirror of
https://github.com/bitwarden/browser
synced 2026-01-03 17:13:47 +00:00
[PM-7846] Implement a rust based native messaging proxy and IPC system
This commit is contained in:
91
apps/desktop/desktop_native/core/src/ipc/client.rs
Normal file
91
apps/desktop/desktop_native/core/src/ipc/client.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use interprocess::local_socket::{
|
||||
tokio::{prelude::*, Stream},
|
||||
GenericFilePath, ToFsName,
|
||||
};
|
||||
use log::{error, info};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
pub async fn connect(
|
||||
tx: tokio::sync::mpsc::Sender<String>,
|
||||
mut rx: tokio::sync::mpsc::Receiver<String>,
|
||||
) {
|
||||
// Keep track of connection failures to make sure we don't leave the process as a zombie
|
||||
let mut connection_failures = 0;
|
||||
|
||||
loop {
|
||||
match connect_inner(&tx, &mut rx).await {
|
||||
Ok(()) => return,
|
||||
Err(e) => {
|
||||
connection_failures += 1;
|
||||
if connection_failures >= 20 {
|
||||
error!("Failed to connect to IPC server after 20 attempts: {e}");
|
||||
return;
|
||||
}
|
||||
|
||||
error!("Failed to connect to IPC server: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_inner(
|
||||
tx: &tokio::sync::mpsc::Sender<String>,
|
||||
rx: &mut tokio::sync::mpsc::Receiver<String>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let path = super::path("bitwarden");
|
||||
|
||||
info!("Attempting to connect to {}", path.display());
|
||||
|
||||
let name = path.as_os_str().to_fs_name::<GenericFilePath>()?;
|
||||
let mut conn = Stream::connect(name).await?;
|
||||
|
||||
info!("Connected to {}", path.display());
|
||||
|
||||
tx.send("{\"command\":\"connected\"}".to_owned()).await?;
|
||||
|
||||
let mut buffer = vec![0; 8192];
|
||||
|
||||
// Listen to IPC messages
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Send messages to the IPC server
|
||||
msg = rx.recv() => {
|
||||
match msg {
|
||||
Some(msg) => {
|
||||
conn.write_all(msg.as_bytes()).await?;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
},
|
||||
|
||||
// Read messages from the IPC server
|
||||
res = conn.read(&mut buffer[..]) => {
|
||||
match res {
|
||||
Err(e) => {
|
||||
error!("Error reading from IPC server: {e}");
|
||||
tx.send("{\"command\":\"disconnected\"}".to_owned()).await?;
|
||||
break;
|
||||
}
|
||||
Ok(0) => {
|
||||
info!("Connection closed");
|
||||
tx.send("{\"command\":\"disconnected\"}".to_owned()).await?;
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
let message = String::from_utf8_lossy(&buffer[..n]).to_string();
|
||||
tx.send(message).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
27
apps/desktop/desktop_native/core/src/ipc/mod.rs
Normal file
27
apps/desktop/desktop_native/core/src/ipc/mod.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
pub mod client;
|
||||
pub mod server;
|
||||
|
||||
// TODO: We probably can find a better location for the IPC socket on Mac.
|
||||
// One idea is to use App Groups if we have the entitlement:
|
||||
// https://developer.apple.com/documentation/xcode/configuring-app-groups
|
||||
// Then we can write the socket to /Users/<user>/Library/Group Containers/group.com.bitwarden.<our group name>/ipc.sock
|
||||
// We might also be able to write to the Apps Container directory from the proxy binary:
|
||||
// /Users/<user>/Library/Containers/com.bitwarden.desktop/Data
|
||||
|
||||
/// Resolve the path to the IPC socket.
|
||||
pub fn path(name: &str) -> std::path::PathBuf {
|
||||
let home = dirs::home_dir().unwrap();
|
||||
|
||||
if cfg!(windows) {
|
||||
// Use a unique IPC pipe //./pipe/bitwarden.xxxxxxxxxxxxxxxxx.sock per user.
|
||||
// Hashing prevents problems with reserved characters and file length limitations.
|
||||
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
|
||||
use sha2::Digest;
|
||||
let hash = sha2::Sha256::digest(home.as_os_str().as_encoded_bytes());
|
||||
let hash_b64 = URL_SAFE_NO_PAD.encode(hash.as_slice());
|
||||
|
||||
format!(r"\\.\pipe\{hash_b64}app.{name}").into()
|
||||
} else {
|
||||
home.join("tmp").join(format!("app.{name}"))
|
||||
}
|
||||
}
|
||||
190
apps/desktop/desktop_native/core/src/ipc/server.rs
Normal file
190
apps/desktop/desktop_native/core/src/ipc/server.rs
Normal file
@@ -0,0 +1,190 @@
|
||||
use std::error::Error;
|
||||
|
||||
use futures::{Sink, SinkExt, TryFutureExt};
|
||||
|
||||
use anyhow::Result;
|
||||
use interprocess::local_socket::{tokio::prelude::*, GenericFilePath, ListenerOptions};
|
||||
use log::{error, info};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
|
||||
sync::broadcast::Receiver,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Message {
|
||||
pub client_id: u32,
|
||||
pub kind: MessageType,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MessageType {
|
||||
Connected,
|
||||
Disconnected,
|
||||
Message,
|
||||
}
|
||||
|
||||
pub struct Server {
|
||||
cancel_token: CancellationToken,
|
||||
tx_connections: tokio::sync::broadcast::Sender<String>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn start<T>(name: &str, tx: T) -> Result<Self, Box<dyn Error>>
|
||||
where
|
||||
T: Sink<Message> + Unpin + Send + Clone + 'static,
|
||||
<T as Sink<Message>>::Error: std::error::Error + 'static,
|
||||
{
|
||||
let path = super::path(name);
|
||||
|
||||
// If the unix socket file already exists, we get an error when trying to bind to it. So we remove it first.
|
||||
// Any processes that were using the old socket will remain connected but any new connections will use the new socket.
|
||||
if !cfg!(windows) {
|
||||
let _ = std::fs::remove_file(&path);
|
||||
}
|
||||
|
||||
let name = path.as_os_str().to_fs_name::<GenericFilePath>()?;
|
||||
let opts = ListenerOptions::new().name(name);
|
||||
let listener = opts.create_tokio()?;
|
||||
|
||||
let (tx_connections, rx_connections) = tokio::sync::broadcast::channel::<String>(32);
|
||||
|
||||
let cancel_token = CancellationToken::new();
|
||||
let cancel_token2 = cancel_token.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
futures::pin_mut!(listener);
|
||||
let mut next_client_id = 1_u32;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel_token2.cancelled() => {
|
||||
info!("IPC server cancelled.");
|
||||
break;
|
||||
},
|
||||
|
||||
msg = listener.accept() => {
|
||||
match msg {
|
||||
Ok(stream) => {
|
||||
let client_id = next_client_id;
|
||||
next_client_id += 1;
|
||||
|
||||
let rx_connections = rx_connections.resubscribe();
|
||||
let tx = tx.clone();
|
||||
let cancel_token_clone = cancel_token2.clone();
|
||||
|
||||
tokio::spawn(handle_connection(stream, tx, rx_connections, cancel_token_clone, client_id).map_err(|e| {
|
||||
error!("Error handling connection: {}", e)
|
||||
}));
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Error reading message: {}", e);
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
cancel_token,
|
||||
tx_connections,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn send(&self, message: String) -> Result<()> {
|
||||
self.tx_connections.send(message)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
self.cancel_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Server {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection<T>(
|
||||
mut stream: impl AsyncRead + AsyncWrite + Unpin,
|
||||
mut tx: T,
|
||||
mut rx_connections: Receiver<String>,
|
||||
cancel_token: CancellationToken,
|
||||
client_id: u32,
|
||||
) -> Result<(), Box<dyn Error>>
|
||||
where
|
||||
T: Sink<Message> + Unpin,
|
||||
<T as Sink<Message>>::Error: std::error::Error + 'static,
|
||||
{
|
||||
tx.send(Message {
|
||||
client_id,
|
||||
kind: MessageType::Connected,
|
||||
message: "Connected".to_owned(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut buf = vec![0u8; 8192];
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel_token.cancelled() => {
|
||||
info!("Client {client_id} cancelled.");
|
||||
break;
|
||||
},
|
||||
|
||||
msg = rx_connections.recv() => {
|
||||
match msg {
|
||||
Ok(msg) => {
|
||||
stream.write_all(msg.as_bytes()).await?;
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Error reading message: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
result = stream.read(&mut buf) => {
|
||||
match result {
|
||||
Err(e) => {
|
||||
info!("Error reading from client {client_id}: {e}");
|
||||
|
||||
tx.send(Message {
|
||||
client_id,
|
||||
kind: MessageType::Disconnected,
|
||||
message: "Disconnected".to_owned(),
|
||||
}).await?;
|
||||
break;
|
||||
},
|
||||
Ok(0) => {
|
||||
info!("Client {client_id} disconnected.");
|
||||
|
||||
tx.send(Message {
|
||||
client_id,
|
||||
kind: MessageType::Disconnected,
|
||||
message: "Disconnected".to_owned(),
|
||||
}).await?;
|
||||
break;
|
||||
},
|
||||
Ok(size) => {
|
||||
let msg = std::str::from_utf8(&buf[..size])?;
|
||||
|
||||
tx.send(Message {
|
||||
client_id,
|
||||
kind: MessageType::Message,
|
||||
message: msg.to_string(),
|
||||
}).await?;
|
||||
},
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2,4 +2,5 @@ pub mod biometric;
|
||||
pub mod clipboard;
|
||||
pub mod crypto;
|
||||
pub mod error;
|
||||
pub mod ipc;
|
||||
pub mod password;
|
||||
|
||||
Reference in New Issue
Block a user