--- src/client.rs.orig 2018-10-14 20:00:28 UTC +++ src/client.rs @@ -1,14 +1,14 @@ -use std::io; -use error::RbtError; -use amqp::{self, Session, Options, Channel}; -use amqp::protocol::basic::{Deliver, BasicProperties}; +use amqp::protocol::basic::{BasicProperties, Deliver}; use amqp::Basic; +use amqp::{self, Channel, Options, Session}; use amqp::{Table, TableEntry}; +use error::RbtError; +use std::io; +use std::error::Error; +use std::sync::mpsc; use std::thread; use std::time::Duration; -use std::sync::mpsc; -use std::error::Error; pub struct Sendable { pub exchange: String, @@ -18,21 +18,20 @@ pub struct Sendable { pub file_name: String, pub reader: Box, pub priority: u8, - pub rpctimeout: u64 + pub rpctimeout: u64, } -pub type ReceiveCb = FnMut(&mut Channel, Deliver, BasicProperties, Vec) -> Result<(), RbtError> + Send; +pub type ReceiveCb = + FnMut(&mut Channel, Deliver, BasicProperties, Vec) -> Result<(), RbtError> + Send; pub struct Receiver { - pub exchange:String, + pub exchange: String, pub routing_key: Option, pub auto_ack: bool, - pub callback:Box, + pub callback: Box, } - -pub fn open_send(o:Options, s:Sendable, r:Option) -> Result<(),RbtError> { - +pub fn open_send(o: Options, s: Sendable, r: Option) -> Result<(), RbtError> { // open the channel let (mut session, mut channel) = _open(o)?; @@ -49,7 +48,10 @@ pub fn open_send(o:Options, s:Sendable, r:Option false + } + None => false, }; // read input input buffer @@ -87,12 +89,14 @@ pub fn open_send(o:Options, s:Sendable, r:Option { ch.close(200, "Bye")?; - } + } Err(err) => { if err.description() == "timed out waiting on channel".to_string() { println!("Error timeout"); @@ -121,10 +125,9 @@ pub fn open_send(o:Options, s:Sendable, r:Option TableEntry { +fn narrow(str: &str) -> TableEntry { let boolv = str.parse::(); if !boolv.is_err() { TableEntry::Bool(boolv.unwrap()) @@ -138,19 +141,22 @@ fn narrow(str:&str) -> TableEntry { } } - -fn _open(o:Options) -> Result<(Session, Channel),RbtError> { -// errln!("Connecting to amqp://{}:{}@{}:{}/{}", -// o.login, o.password, o.host, o.port, o.vhost); +fn _open(o: Options) -> Result<(Session, Channel), RbtError> { + // errln!("Connecting to amqp://{}:{}@{}:{}/{}", + // o.login, o.password, o.host, o.port, o.vhost); let mut session = Session::new(o)?; let channel = session.open_channel(1)?; Ok((session, channel)) } impl amqp::Consumer for Receiver { - fn handle_delivery(&mut self, channel:&mut Channel, deliver:Deliver, - headers:BasicProperties, body:Vec){ - + fn handle_delivery( + &mut self, + channel: &mut Channel, + deliver: Deliver, + headers: BasicProperties, + body: Vec, + ) { let delivery_tag = deliver.delivery_tag.clone(); if self.auto_ack { @@ -160,12 +166,15 @@ impl amqp::Consumer for Receiver { // and deliver to callback ((self.callback)(channel, deliver, headers, body)).unwrap_or_else(::error::handle); - } } -pub fn open_receive(o:Options, q:Option, force_declare: bool, r:Receiver) -> Result<(),RbtError> { - +pub fn open_receive( + o: Options, + q: Option, + force_declare: bool, + r: Receiver, +) -> Result<(), RbtError> { // open session/channel let (_, mut channel) = _open(o)?; @@ -178,22 +187,33 @@ pub fn open_receive(o:Options, q:Option, force Ok(()) } - -fn do_open_receive(channel:&mut Channel, q:Option, force_declare: bool, r:Receiver) -> Result { - +fn do_open_receive( + channel: &mut Channel, + q: Option, + force_declare: bool, + r: Receiver, +) -> Result { let mut auto_delete = false; let mut bind_routing_key = r.routing_key.clone(); - + let queue_name = match q { Some(q) => { // Force the declaration of this queue if force_declare { // queue, passive, durable, exclusive, auto_delete, nowait, arguments - let queue_declare = channel.queue_declare(q, false, false, auto_delete, auto_delete, false, Table::new())?; + let queue_declare = channel.queue_declare( + q, + false, + false, + auto_delete, + auto_delete, + false, + Table::new(), + )?; // name is auto generated queue_declare.queue - }else{ + } else { q } } @@ -205,25 +225,33 @@ fn do_open_receive(channel:&mut Channel, q:Option