1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use futures::Future;
use slog::{debug, error, info, warn, Logger};
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::thread;
use tokio::runtime::TaskExecutor;
use types::EthSpec;
use ws::{Sender, WebSocket};

mod config;

pub use config::Config;

pub struct WebSocketSender<T: EthSpec> {
    sender: Option<Sender>,
    _phantom: PhantomData<T>,
}

impl<T: EthSpec> WebSocketSender<T> {
    /// Creates a dummy websocket server that never starts and where all future calls are no-ops.
    pub fn dummy() -> Self {
        Self {
            sender: None,
            _phantom: PhantomData,
        }
    }

    pub fn send_string(&self, string: String) -> Result<(), String> {
        if let Some(sender) = &self.sender {
            sender
                .send(string)
                .map_err(|e| format!("Unable to broadcast to websocket clients: {:?}", e))
        } else {
            Ok(())
        }
    }
}

pub fn start_server<T: EthSpec>(
    config: &Config,
    executor: &TaskExecutor,
    log: &Logger,
) -> Result<(WebSocketSender<T>, exit_future::Signal, SocketAddr), String> {
    let server_string = format!("{}:{}", config.listen_address, config.port);

    // Create a server that simply ignores any incoming messages.
    let server = WebSocket::new(|_| |_| Ok(()))
        .map_err(|e| format!("Failed to initialize websocket server: {:?}", e))?
        .bind(server_string.clone())
        .map_err(|e| {
            format!(
                "Failed to bind websocket server to {}: {:?}",
                server_string, e
            )
        })?;

    let actual_listen_addr = server.local_addr().map_err(|e| {
        format!(
            "Failed to read listening addr from websocket server: {:?}",
            e
        )
    })?;

    let broadcaster = server.broadcaster();

    // Produce a signal/channel that can gracefully shutdown the websocket server.
    let exit_signal = {
        let (exit_signal, exit) = exit_future::signal();

        let log_inner = log.clone();
        let broadcaster_inner = server.broadcaster();
        let exit_future = exit.and_then(move |_| {
            if let Err(e) = broadcaster_inner.shutdown() {
                warn!(
                    log_inner,
                    "Websocket server errored on shutdown";
                    "error" => format!("{:?}", e)
                );
            } else {
                info!(log_inner, "Websocket server shutdown");
            }
            Ok(())
        });

        // Place a future on the executor that will shutdown the websocket server when the
        // application exits.
        executor.spawn(exit_future);

        exit_signal
    };

    let log_inner = log.clone();
    let _handle = thread::spawn(move || match server.run() {
        Ok(_) => {
            debug!(
                log_inner,
                "Websocket server thread stopped";
            );
        }
        Err(e) => {
            error!(
                log_inner,
                "Websocket server failed to start";
                "error" => format!("{:?}", e)
            );
        }
    });

    info!(
        log,
        "WebSocket server started";
        "address" => format!("{}", actual_listen_addr.ip()),
        "port" => actual_listen_addr.port(),
    );

    Ok((
        WebSocketSender {
            sender: Some(broadcaster),
            _phantom: PhantomData,
        },
        exit_signal,
        actual_listen_addr,
    ))
}