Skip to main content

tokio_stream/stream_ext/
collect.rs

1use crate::Stream;
2
3use core::future::Future;
4use core::marker::{PhantomData, PhantomPinned};
5use core::mem;
6use core::pin::Pin;
7use core::task::{ready, Context, Poll};
8use pin_project_lite::pin_project;
9use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque};
10use std::hash::Hash;
11
12// Do not export this struct until `FromStream` can be unsealed.
13pin_project! {
14    /// Future returned by the [`collect`](super::StreamExt::collect) method.
15    #[must_use = "futures do nothing unless you `.await` or poll them"]
16    #[derive(Debug)]
17    pub struct Collect<T, U, C>
18    {
19        #[pin]
20        stream: T,
21        collection: C,
22        _output: PhantomData<U>,
23        // Make this future `!Unpin` for compatibility with async trait methods.
24        #[pin]
25        _pin: PhantomPinned,
26    }
27}
28
29/// Convert from a [`Stream`].
30///
31/// This trait is not intended to be used directly. Instead, call
32/// [`StreamExt::collect()`](super::StreamExt::collect).
33///
34/// # Implementing
35///
36/// Currently, this trait may not be implemented by third parties. The trait is
37/// sealed in order to make changes in the future. Stabilization is pending
38/// enhancements to the Rust language.
39pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
40
41impl<T, U> Collect<T, U, U::InternalCollection>
42where
43    T: Stream,
44    U: FromStream<T::Item>,
45{
46    pub(super) fn new(stream: T) -> Collect<T, U, U::InternalCollection> {
47        let (lower, upper) = stream.size_hint();
48        let collection = U::initialize(sealed::Internal, lower, upper);
49
50        Collect {
51            stream,
52            collection,
53            _output: PhantomData,
54            _pin: PhantomPinned,
55        }
56    }
57}
58
59impl<T, U> Future for Collect<T, U, U::InternalCollection>
60where
61    T: Stream,
62    U: FromStream<T::Item>,
63{
64    type Output = U;
65
66    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
67        use Poll::Ready;
68
69        loop {
70            let me = self.as_mut().project();
71
72            let item = match ready!(me.stream.poll_next(cx)) {
73                Some(item) => item,
74                None => {
75                    return Ready(U::finalize(sealed::Internal, me.collection));
76                }
77            };
78
79            if !U::extend(sealed::Internal, me.collection, item) {
80                return Ready(U::finalize(sealed::Internal, me.collection));
81            }
82        }
83    }
84}
85
86// ===== FromStream implementations
87
88impl FromStream<()> for () {}
89
90impl sealed::FromStreamPriv<()> for () {
91    type InternalCollection = ();
92
93    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
94
95    fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
96        true
97    }
98
99    fn finalize(_: sealed::Internal, _collection: &mut ()) {}
100}
101
102impl<T: AsRef<str>> FromStream<T> for String {}
103
104impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
105    type InternalCollection = String;
106
107    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
108        String::new()
109    }
110
111    fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
112        collection.push_str(item.as_ref());
113        true
114    }
115
116    fn finalize(_: sealed::Internal, collection: &mut String) -> String {
117        mem::take(collection)
118    }
119}
120
121impl<T> FromStream<T> for Vec<T> {}
122
123impl<T> sealed::FromStreamPriv<T> for Vec<T> {
124    type InternalCollection = Vec<T>;
125
126    fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
127        Vec::with_capacity(lower)
128    }
129
130    fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
131        collection.push(item);
132        true
133    }
134
135    fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
136        mem::take(collection)
137    }
138}
139
140impl<T> FromStream<T> for VecDeque<T> {}
141
142impl<T> sealed::FromStreamPriv<T> for VecDeque<T> {
143    type InternalCollection = VecDeque<T>;
144
145    fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> VecDeque<T> {
146        VecDeque::with_capacity(lower)
147    }
148
149    fn extend(_: sealed::Internal, collection: &mut VecDeque<T>, item: T) -> bool {
150        collection.push_back(item);
151        true
152    }
153
154    fn finalize(_: sealed::Internal, collection: &mut VecDeque<T>) -> VecDeque<T> {
155        mem::take(collection)
156    }
157}
158
159impl<T> FromStream<T> for LinkedList<T> {}
160
161impl<T> sealed::FromStreamPriv<T> for LinkedList<T> {
162    type InternalCollection = LinkedList<T>;
163
164    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> LinkedList<T> {
165        LinkedList::new()
166    }
167
168    fn extend(_: sealed::Internal, collection: &mut LinkedList<T>, item: T) -> bool {
169        collection.push_back(item);
170        true
171    }
172
173    fn finalize(_: sealed::Internal, collection: &mut LinkedList<T>) -> LinkedList<T> {
174        mem::take(collection)
175    }
176}
177
178impl<T: Ord> FromStream<T> for BTreeSet<T> {}
179
180impl<T: Ord> sealed::FromStreamPriv<T> for BTreeSet<T> {
181    type InternalCollection = BTreeSet<T>;
182
183    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BTreeSet<T> {
184        BTreeSet::new()
185    }
186
187    fn extend(_: sealed::Internal, collection: &mut BTreeSet<T>, item: T) -> bool {
188        collection.insert(item);
189        true
190    }
191
192    fn finalize(_: sealed::Internal, collection: &mut BTreeSet<T>) -> BTreeSet<T> {
193        mem::take(collection)
194    }
195}
196
197impl<K: Ord, V> FromStream<(K, V)> for BTreeMap<K, V> {}
198
199impl<K: Ord, V> sealed::FromStreamPriv<(K, V)> for BTreeMap<K, V> {
200    type InternalCollection = BTreeMap<K, V>;
201
202    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BTreeMap<K, V> {
203        BTreeMap::new()
204    }
205
206    fn extend(_: sealed::Internal, collection: &mut BTreeMap<K, V>, (key, value): (K, V)) -> bool {
207        collection.insert(key, value);
208        true
209    }
210
211    fn finalize(_: sealed::Internal, collection: &mut BTreeMap<K, V>) -> BTreeMap<K, V> {
212        mem::take(collection)
213    }
214}
215
216impl<T: Eq + Hash> FromStream<T> for HashSet<T> {}
217
218impl<T: Eq + Hash> sealed::FromStreamPriv<T> for HashSet<T> {
219    type InternalCollection = HashSet<T>;
220
221    fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> HashSet<T> {
222        HashSet::with_capacity(lower)
223    }
224
225    fn extend(_: sealed::Internal, collection: &mut HashSet<T>, item: T) -> bool {
226        collection.insert(item);
227        true
228    }
229
230    fn finalize(_: sealed::Internal, collection: &mut HashSet<T>) -> HashSet<T> {
231        mem::take(collection)
232    }
233}
234
235impl<K: Eq + Hash, V> FromStream<(K, V)> for HashMap<K, V> {}
236
237impl<K: Eq + Hash, V> sealed::FromStreamPriv<(K, V)> for HashMap<K, V> {
238    type InternalCollection = HashMap<K, V>;
239
240    fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> HashMap<K, V> {
241        HashMap::with_capacity(lower)
242    }
243
244    fn extend(_: sealed::Internal, collection: &mut HashMap<K, V>, (key, value): (K, V)) -> bool {
245        collection.insert(key, value);
246        true
247    }
248
249    fn finalize(_: sealed::Internal, collection: &mut HashMap<K, V>) -> HashMap<K, V> {
250        mem::take(collection)
251    }
252}
253
254impl<T: Ord> FromStream<T> for BinaryHeap<T> {}
255
256impl<T: Ord> sealed::FromStreamPriv<T> for BinaryHeap<T> {
257    type InternalCollection = BinaryHeap<T>;
258
259    fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> BinaryHeap<T> {
260        BinaryHeap::with_capacity(lower)
261    }
262
263    fn extend(_: sealed::Internal, collection: &mut BinaryHeap<T>, item: T) -> bool {
264        collection.push(item);
265        true
266    }
267
268    fn finalize(_: sealed::Internal, collection: &mut BinaryHeap<T>) -> BinaryHeap<T> {
269        mem::take(collection)
270    }
271}
272
273impl<T> FromStream<T> for Box<[T]> {}
274
275impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
276    type InternalCollection = Vec<T>;
277
278    fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
279        <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
280    }
281
282    fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
283        <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
284    }
285
286    fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
287        <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
288            .into_boxed_slice()
289    }
290}
291
292impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
293
294impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
295where
296    U: FromStream<T>,
297{
298    type InternalCollection = Result<U::InternalCollection, E>;
299
300    fn initialize(
301        _: sealed::Internal,
302        lower: usize,
303        upper: Option<usize>,
304    ) -> Result<U::InternalCollection, E> {
305        Ok(U::initialize(sealed::Internal, lower, upper))
306    }
307
308    fn extend(
309        _: sealed::Internal,
310        collection: &mut Self::InternalCollection,
311        item: Result<T, E>,
312    ) -> bool {
313        assert!(collection.is_ok());
314        match item {
315            Ok(item) => {
316                let collection = collection.as_mut().ok().expect("invalid state");
317                U::extend(sealed::Internal, collection, item)
318            }
319            Err(err) => {
320                *collection = Err(err);
321                false
322            }
323        }
324    }
325
326    fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
327        if let Ok(collection) = collection.as_mut() {
328            Ok(U::finalize(sealed::Internal, collection))
329        } else {
330            let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
331
332            Err(res.map(drop).unwrap_err())
333        }
334    }
335}
336
337pub(crate) mod sealed {
338    #[doc(hidden)]
339    pub trait FromStreamPriv<T> {
340        /// Intermediate type used during collection process
341        ///
342        /// The name of this type is internal and cannot be relied upon.
343        type InternalCollection;
344
345        /// Initialize the collection
346        fn initialize(
347            internal: Internal,
348            lower: usize,
349            upper: Option<usize>,
350        ) -> Self::InternalCollection;
351
352        /// Extend the collection with the received item
353        ///
354        /// Return `true` to continue streaming, `false` complete collection.
355        fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
356
357        /// Finalize collection into target type.
358        fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
359    }
360
361    #[allow(missing_debug_implementations)]
362    pub struct Internal;
363}