1use crate::Stream;
2
3use core::future::Future;
4use core::marker::{PhantomData, PhantomPinned};
5use core::mem;
6use core::pin::Pin;
7use core::task::{ready, Context, Poll};
8use pin_project_lite::pin_project;
9use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque};
10use std::hash::Hash;
11
12pin_project! {
14 #[must_use = "futures do nothing unless you `.await` or poll them"]
16 #[derive(Debug)]
17 pub struct Collect<T, U, C>
18 {
19 #[pin]
20 stream: T,
21 collection: C,
22 _output: PhantomData<U>,
23 #[pin]
25 _pin: PhantomPinned,
26 }
27}
28
29pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
40
41impl<T, U> Collect<T, U, U::InternalCollection>
42where
43 T: Stream,
44 U: FromStream<T::Item>,
45{
46 pub(super) fn new(stream: T) -> Collect<T, U, U::InternalCollection> {
47 let (lower, upper) = stream.size_hint();
48 let collection = U::initialize(sealed::Internal, lower, upper);
49
50 Collect {
51 stream,
52 collection,
53 _output: PhantomData,
54 _pin: PhantomPinned,
55 }
56 }
57}
58
59impl<T, U> Future for Collect<T, U, U::InternalCollection>
60where
61 T: Stream,
62 U: FromStream<T::Item>,
63{
64 type Output = U;
65
66 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
67 use Poll::Ready;
68
69 loop {
70 let me = self.as_mut().project();
71
72 let item = match ready!(me.stream.poll_next(cx)) {
73 Some(item) => item,
74 None => {
75 return Ready(U::finalize(sealed::Internal, me.collection));
76 }
77 };
78
79 if !U::extend(sealed::Internal, me.collection, item) {
80 return Ready(U::finalize(sealed::Internal, me.collection));
81 }
82 }
83 }
84}
85
86impl FromStream<()> for () {}
89
90impl sealed::FromStreamPriv<()> for () {
91 type InternalCollection = ();
92
93 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
94
95 fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
96 true
97 }
98
99 fn finalize(_: sealed::Internal, _collection: &mut ()) {}
100}
101
102impl<T: AsRef<str>> FromStream<T> for String {}
103
104impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
105 type InternalCollection = String;
106
107 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
108 String::new()
109 }
110
111 fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
112 collection.push_str(item.as_ref());
113 true
114 }
115
116 fn finalize(_: sealed::Internal, collection: &mut String) -> String {
117 mem::take(collection)
118 }
119}
120
121impl<T> FromStream<T> for Vec<T> {}
122
123impl<T> sealed::FromStreamPriv<T> for Vec<T> {
124 type InternalCollection = Vec<T>;
125
126 fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
127 Vec::with_capacity(lower)
128 }
129
130 fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
131 collection.push(item);
132 true
133 }
134
135 fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
136 mem::take(collection)
137 }
138}
139
140impl<T> FromStream<T> for VecDeque<T> {}
141
142impl<T> sealed::FromStreamPriv<T> for VecDeque<T> {
143 type InternalCollection = VecDeque<T>;
144
145 fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> VecDeque<T> {
146 VecDeque::with_capacity(lower)
147 }
148
149 fn extend(_: sealed::Internal, collection: &mut VecDeque<T>, item: T) -> bool {
150 collection.push_back(item);
151 true
152 }
153
154 fn finalize(_: sealed::Internal, collection: &mut VecDeque<T>) -> VecDeque<T> {
155 mem::take(collection)
156 }
157}
158
159impl<T> FromStream<T> for LinkedList<T> {}
160
161impl<T> sealed::FromStreamPriv<T> for LinkedList<T> {
162 type InternalCollection = LinkedList<T>;
163
164 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> LinkedList<T> {
165 LinkedList::new()
166 }
167
168 fn extend(_: sealed::Internal, collection: &mut LinkedList<T>, item: T) -> bool {
169 collection.push_back(item);
170 true
171 }
172
173 fn finalize(_: sealed::Internal, collection: &mut LinkedList<T>) -> LinkedList<T> {
174 mem::take(collection)
175 }
176}
177
178impl<T: Ord> FromStream<T> for BTreeSet<T> {}
179
180impl<T: Ord> sealed::FromStreamPriv<T> for BTreeSet<T> {
181 type InternalCollection = BTreeSet<T>;
182
183 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BTreeSet<T> {
184 BTreeSet::new()
185 }
186
187 fn extend(_: sealed::Internal, collection: &mut BTreeSet<T>, item: T) -> bool {
188 collection.insert(item);
189 true
190 }
191
192 fn finalize(_: sealed::Internal, collection: &mut BTreeSet<T>) -> BTreeSet<T> {
193 mem::take(collection)
194 }
195}
196
197impl<K: Ord, V> FromStream<(K, V)> for BTreeMap<K, V> {}
198
199impl<K: Ord, V> sealed::FromStreamPriv<(K, V)> for BTreeMap<K, V> {
200 type InternalCollection = BTreeMap<K, V>;
201
202 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BTreeMap<K, V> {
203 BTreeMap::new()
204 }
205
206 fn extend(_: sealed::Internal, collection: &mut BTreeMap<K, V>, (key, value): (K, V)) -> bool {
207 collection.insert(key, value);
208 true
209 }
210
211 fn finalize(_: sealed::Internal, collection: &mut BTreeMap<K, V>) -> BTreeMap<K, V> {
212 mem::take(collection)
213 }
214}
215
216impl<T: Eq + Hash> FromStream<T> for HashSet<T> {}
217
218impl<T: Eq + Hash> sealed::FromStreamPriv<T> for HashSet<T> {
219 type InternalCollection = HashSet<T>;
220
221 fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> HashSet<T> {
222 HashSet::with_capacity(lower)
223 }
224
225 fn extend(_: sealed::Internal, collection: &mut HashSet<T>, item: T) -> bool {
226 collection.insert(item);
227 true
228 }
229
230 fn finalize(_: sealed::Internal, collection: &mut HashSet<T>) -> HashSet<T> {
231 mem::take(collection)
232 }
233}
234
235impl<K: Eq + Hash, V> FromStream<(K, V)> for HashMap<K, V> {}
236
237impl<K: Eq + Hash, V> sealed::FromStreamPriv<(K, V)> for HashMap<K, V> {
238 type InternalCollection = HashMap<K, V>;
239
240 fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> HashMap<K, V> {
241 HashMap::with_capacity(lower)
242 }
243
244 fn extend(_: sealed::Internal, collection: &mut HashMap<K, V>, (key, value): (K, V)) -> bool {
245 collection.insert(key, value);
246 true
247 }
248
249 fn finalize(_: sealed::Internal, collection: &mut HashMap<K, V>) -> HashMap<K, V> {
250 mem::take(collection)
251 }
252}
253
254impl<T: Ord> FromStream<T> for BinaryHeap<T> {}
255
256impl<T: Ord> sealed::FromStreamPriv<T> for BinaryHeap<T> {
257 type InternalCollection = BinaryHeap<T>;
258
259 fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> BinaryHeap<T> {
260 BinaryHeap::with_capacity(lower)
261 }
262
263 fn extend(_: sealed::Internal, collection: &mut BinaryHeap<T>, item: T) -> bool {
264 collection.push(item);
265 true
266 }
267
268 fn finalize(_: sealed::Internal, collection: &mut BinaryHeap<T>) -> BinaryHeap<T> {
269 mem::take(collection)
270 }
271}
272
273impl<T> FromStream<T> for Box<[T]> {}
274
275impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
276 type InternalCollection = Vec<T>;
277
278 fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
279 <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
280 }
281
282 fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
283 <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
284 }
285
286 fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
287 <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
288 .into_boxed_slice()
289 }
290}
291
292impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
293
294impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
295where
296 U: FromStream<T>,
297{
298 type InternalCollection = Result<U::InternalCollection, E>;
299
300 fn initialize(
301 _: sealed::Internal,
302 lower: usize,
303 upper: Option<usize>,
304 ) -> Result<U::InternalCollection, E> {
305 Ok(U::initialize(sealed::Internal, lower, upper))
306 }
307
308 fn extend(
309 _: sealed::Internal,
310 collection: &mut Self::InternalCollection,
311 item: Result<T, E>,
312 ) -> bool {
313 assert!(collection.is_ok());
314 match item {
315 Ok(item) => {
316 let collection = collection.as_mut().ok().expect("invalid state");
317 U::extend(sealed::Internal, collection, item)
318 }
319 Err(err) => {
320 *collection = Err(err);
321 false
322 }
323 }
324 }
325
326 fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
327 if let Ok(collection) = collection.as_mut() {
328 Ok(U::finalize(sealed::Internal, collection))
329 } else {
330 let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
331
332 Err(res.map(drop).unwrap_err())
333 }
334 }
335}
336
337pub(crate) mod sealed {
338 #[doc(hidden)]
339 pub trait FromStreamPriv<T> {
340 type InternalCollection;
344
345 fn initialize(
347 internal: Internal,
348 lower: usize,
349 upper: Option<usize>,
350 ) -> Self::InternalCollection;
351
352 fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
356
357 fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
359 }
360
361 #[allow(missing_debug_implementations)]
362 pub struct Internal;
363}