1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
//! this module was written based on a basis of an analysis of an existing
//! project from an esp-rs team member Juraj Sadel, that was shown at Espressif
//! DevCon 2023.
//! Available on: https://github.com/JurajSadel/esp32c3-no-std-async-mqtt-demo

use core::fmt::Write as coreWrite;

use embassy_net::{dns::DnsQueryType, tcp::TcpSocket, Stack};
use embassy_time::{Duration, Timer};
use embedded_svc::wifi::{ClientConfiguration, Configuration};
use esp_println::println;
use esp_wifi::wifi::{WifiController, WifiDevice, WifiEvent, WifiStaDevice, WifiState};
use heapless::String;
use rust_mqtt::{
    client::{client::MqttClient, client_config::ClientConfig},
    packet::v5::reason_codes::ReasonCode,
    utils::rng_generator::CountingRng,
};
use smoltcp::wire::IpAddress;

/// Represents the IP address for the HiveMQ MQTT broker.
pub const HIVE_MQ_IP: &str = "18.196.194.55";
/// Represents the port number for the HiveMQ MQTT broker.
pub const HIVE_MQ_PORT: u16 = 8884;

#[macro_export]
/// Macro to prepare buffers with fixed sizes for MQTT communication.
macro_rules! prepare_buffers {
    () => {
        ([0u8; 1536], [0u8; 1536], [0u8; 4096], [0u8; 4096])
    };
}

#[macro_export]
/// Macro to wait until WiFi is connected in async variation
/// Typically used after `net_task` async task call.
macro_rules! wait_wifi {
    ($stack:expr, $config:ident) => {
        loop {
            if $stack.is_link_up() {
                break;
            }
            embassy_time::Timer::after(embassy_time::Duration::from_millis(500)).await;
        }
    };
}

/// Macro to retrieve the IP configuration from the network stack.
#[macro_export]
macro_rules! get_ip {
    ($stack:expr, $config:ident) => {
        loop {
            if let Some($config) = $stack.config_v4() {
                println!("Got IP: {}", $config.address); // dhcp IP address
                break;
            }
            embassy_time::Timer::after(embassy_time::Duration::from_millis(500)).await;
        }
    };
}

/// Macro to create a network stack for WiFi communication.
#[macro_export]
macro_rules! create_stack {
    ($wifi_interface:expr, $config:expr) => {{
        let seed = 1234;

        &*static_cell::make_static!(embassy_net::Stack::new(
            $wifi_interface,
            $config,
            static_cell::make_static!(embassy_net::StackResources::<3>::new()),
            seed
        ))
    }};
}

/// Establishes a default MQTT connection with predefined settings: HiveMQ
/// broker
///
/// # Arguments
/// * `stack` - A reference to the network stack.
/// * `client_id` - The MQTT client identifier.
/// * `rx_buffer_socket` - Receive buffer for the socket.
/// * `tx_buffer_socket` - Transmit buffer for the socket.
/// * `write_buffer_mqtt` - Write buffer for MQTT client.
/// * `recv_buffer_mqtt` - Receive buffer for MQTT client.
///
/// # Returns
/// An `MqttClient` instance configured for communication.

pub async fn mqtt_connect_default<'a>(
    stack: &'static Stack<WifiDevice<'static, WifiStaDevice>>,
    client_id: &'a str,
    rx_buffer_socket: &'a mut [u8],
    tx_buffer_socket: &'a mut [u8],
    write_buffer_mqtt: &'a mut [u8; 4096],
    recv_buffer_mqtt: &'a mut [u8; 4096],
) -> MqttClient<'a, TcpSocket<'a>, 5, CountingRng> {
    mqtt_connect_custom(
        stack,
        client_id,
        rx_buffer_socket,
        tx_buffer_socket,
        write_buffer_mqtt,
        recv_buffer_mqtt,
        "mqtt-dashboard.com",
        1883,
        None,
        None,
    )
    .await
}

/// Establishes a custom MQTT connection with the specified parameters.
///
/// # Arguments
/// * `stack` - The network `Stack` to use for the MQTT connection.
/// * `client_id` - The client ID for the MQTT session.
/// * `rx_buffer_socket` - Receive buffer for the socket connection.
/// * `tx_buffer_socket` - Transmit buffer for the socket connection.
/// * `write_buffer_mqtt` - Write buffer for the MQTT client.
/// * `recv_buffer_mqtt` - Receive buffer for the MQTT client.
/// * `broker_address` - The address of the MQTT broker.
/// * `broker_port` - The port of the MQTT broker.
/// * `username` - Optional username for MQTT broker authentication.
/// * `password` - Optional password for MQTT broker authentication.
///
/// # Returns
/// Returns an `MqttClient` instance configured for the specified broker and
/// credentials.

pub async fn mqtt_connect_custom<'a>(
    stack: &'static Stack<WifiDevice<'static, WifiStaDevice>>,
    client_id: &'a str,
    rx_buffer_socket: &'a mut [u8],
    tx_buffer_socket: &'a mut [u8],
    write_buffer_mqtt: &'a mut [u8; 4096],
    recv_buffer_mqtt: &'a mut [u8; 4096],
    broker_address: &str, // IP address or hostname of the MQTT broker
    broker_port: u16,     /* Port of the MQTT broker (usually 1883 for MQTT, 8883 for MQTT
                           * over SSL, but make sure to unclude some TSL certification in your
                           * code then) */
    username: Option<&'a str>, // Optional username for MQTT broker authentication
    password: Option<&'a str>, // Optional password for MQTT broker authentication
) -> MqttClient<'a, TcpSocket<'a>, 5, CountingRng> {
    let mut socket = TcpSocket::new(stack, rx_buffer_socket, tx_buffer_socket);

    socket.set_timeout(Some(embassy_time::Duration::from_secs(10)));

    let address = match stack
        .dns_query(broker_address, DnsQueryType::A)
        .await
        .map(|a| a[0])
    {
        Ok(addr) => addr,
        Err(_) => {
            let addr = super::wifi::ip_string_to_parts(broker_address).unwrap();
            IpAddress::v4(addr[0], addr[1], addr[2], addr[3])
        }
    };

    let remote_endpoint = (address, broker_port);
    println!("connecting...");
    let connection = socket.connect(remote_endpoint).await;
    if let Err(e) = connection {
        println!("connect error: {:?}", e);
    }
    println!("connected!");

    let mut config = ClientConfig::new(
        rust_mqtt::client::client_config::MqttVersion::MQTTv5,
        CountingRng(20000),
    );
    config.add_max_subscribe_qos(rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1);
    config.add_client_id(client_id);
    config.max_packet_size = 149504;

    // Optionally set the username and password
    if let Some(user) = username {
        config.add_username(user);
    }
    if let Some(pass) = password {
        config.add_password(pass);
    }

    let mut client = MqttClient::<_, 5, _>::new(
        socket,
        write_buffer_mqtt,
        4096,
        recv_buffer_mqtt,
        4096,
        config,
    );

    loop {
        match client.connect_to_broker().await {
            Ok(()) => {
                println!("Connected to broker!");
                return client;
            }
            Err(mqtt_error) => match mqtt_error {
                ReasonCode::NetworkError => {
                    println!("MQTT Network Error");
                }
                _ => {
                    println!("Other MQTT Error: {:?}", mqtt_error);
                }
            },
        }
    }
}

/// Runs the network stack for handling MQTT communication.
///
/// # Arguments
/// * `stack` - Reference to the static network stack instance used for MQTT
///   operations.
#[cfg(all(feature = "async", not(feature = "docs")))]
#[embassy_executor::task]
pub async fn net_task(stack: &'static Stack<WifiDevice<'static, WifiStaDevice>>) {
    println!("Start net task");
    stack.run().await;
}

/// Manages WiFi connectivity, ensuring the device is connected to the specified
/// network. This task continuously checks the WiFi connection state and
/// attempts to reconnect if the connection is lost.
///
/// # Arguments
/// * `controller` - The WiFi controller for managing WiFi state and
///   configuration.
/// * `ssid` - The SSID of the WiFi network to connect to.
/// * `pass` - The password for the WiFi network.
#[cfg(all(feature = "async", not(feature = "docs")))]
#[embassy_executor::task]
pub async fn connection(
    mut controller: WifiController<'static>,
    ssid: &'static str,
    pass: &'static str,
) {
    println!("start connection task");
    println!("Device capabilities: {:?}", controller.get_capabilities());
    loop {
        match esp_wifi::wifi::get_wifi_state() {
            WifiState::StaConnected => {
                // wait until we're no longer connected
                controller.wait_for_event(WifiEvent::StaDisconnected).await;
                Timer::after(Duration::from_millis(5000)).await
            }
            _ => {}
        }
        if !matches!(controller.is_started(), Ok(true)) {
            let client_config = Configuration::Client(ClientConfiguration {
                ssid: ssid.try_into().unwrap(),
                password: pass.try_into().unwrap(),
                ..Default::default()
            });
            controller
                .set_configuration(&(&client_config).into())
                .unwrap();
            println!("Starting wifi");
            controller.start().await.unwrap();
            println!("Wifi started!");
        }
        println!("About to connect...");

        match controller.connect().await {
            Ok(_) => println!("Wifi connected!"),
            Err(e) => {
                println!("Failed to connect to wifi: {e:?}");
                Timer::after(Duration::from_millis(5000)).await
            }
        }
    }
}

/// This function attempts to send the message to a specific MQTT topic and
/// retries in case of network errors.
///
/// # Arguments
/// * `client` - A mutable reference to the MQTT client used for sending the
///   message.
/// * `topic_name` - The MQTT topic to which the message will be sent.
/// * `message` - The message payload as a string slice.

pub async fn mqtt_send<'a>(
    client: &mut MqttClient<'a, TcpSocket<'a>, 5, CountingRng>,
    topic_name: &'a str,
    message: &'a [u8],
) {
    loop {
        println!("About to send message");
        match client
            .send_message(
                topic_name,
                message,
                rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1,
                true,
            )
            .await
        {
            Ok(()) => {
                println!("Message sent");
                break;
            }
            Err(mqtt_error) => match mqtt_error {
                ReasonCode::NetworkError => {
                    println!("MQTT Network Error");
                    match client.connect_to_broker().await {
                        Ok(()) => {
                            println!("Reconnected to broker!");
                            continue;
                        }
                        Err(mqtt_error) => match mqtt_error {
                            ReasonCode::NetworkError => {
                                println!("MQTT Network Error");
                            }
                            _ => {
                                println!("Other MQTT Error: {:?}", mqtt_error);
                            }
                        },
                    }
                    continue;
                }
                _ => {
                    println!("Other MQTT Error: {:?}", mqtt_error);
                    continue;
                }
            },
        }
    }
}

/// Subscribes to an MQTT topic.
///
/// # Arguments
/// * `client` - A mutable reference to the MQTT client used for the
///   subscription.
/// * `topic_name` - The MQTT topic to which the client will subscribe.
///
/// This function attempts to subscribe to the topic and retries in case of
/// network errors.

pub async fn mqtt_subscribe<'a>(
    client: &mut MqttClient<'a, TcpSocket<'a>, 5, CountingRng>,
    topic_name: &'a str,
) {
    loop {
        println!("About to subscribe to topic");
        match client.subscribe_to_topic(topic_name).await {
            Ok(()) => {
                println!("Subscribed to topic");
                break;
            }
            Err(mqtt_error) => match mqtt_error {
                ReasonCode::NetworkError => {
                    println!("MQTT Network Error");
                    match client.connect_to_broker().await {
                        Ok(()) => {
                            println!("Reconnected to broker!");
                            continue;
                        }
                        Err(mqtt_error) => match mqtt_error {
                            ReasonCode::NetworkError => {
                                println!("MQTT Network Error");
                            }
                            _ => {
                                println!("Other MQTT Error: {:?}", mqtt_error);
                            }
                        },
                    }
                    continue;
                }
                _ => {
                    println!("Other MQTT Error: {:?}", mqtt_error);
                    continue;
                }
            },
        }
    }
}

/// Waits for and receives a message from the subscribed MQTT topics.
/// It handles reconnection in case of network errors.
/// # Arguments
/// * `client` - A mutable reference to the MQTT client used for receiving
///   messages.
///
/// # Returns
/// Returns a `String` containing the received message if successful.

pub async fn mqtt_receive<'a>(
    client: &mut MqttClient<'a, TcpSocket<'a>, 5, CountingRng>,
) -> String<32> {
    loop {
        match client.receive_message().await {
            Ok((msg_str, _)) => {
                println!("Message received: {}", msg_str);
                let mut string_to_return: String<32> = String::new();
                write!(string_to_return, "{}", msg_str).expect("write! failed...");
                return string_to_return;
            }
            Err(mqtt_error) => match mqtt_error {
                ReasonCode::NetworkError => {
                    match client.connect_to_broker().await {
                        Ok(()) => {
                            println!("Reconnected to broker!");
                            continue;
                        }
                        Err(mqtt_error) => match mqtt_error {
                            ReasonCode::NetworkError => {
                                println!("MQTT Network Error");
                            }
                            _ => {
                                println!("Other MQTT Error: {:?}", mqtt_error);
                            }
                        },
                    }
                    continue;
                }
                _ => {
                    println!("Other MQTT Error or no messages yet");
                    continue;
                }
            },
        }
    }
}

pub use create_stack;
pub use get_ip;
pub use prepare_buffers;
pub use wait_wifi;