Swap poll_fn to allow polling exited tasks
This commit is contained in:
		
							parent
							
								
									edb8f21a74
								
							
						
					
					
						commit
						8fd08b1e97
					
				| @ -202,16 +202,29 @@ impl<F: Future + 'static> TaskStorage<F> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     unsafe fn poll_to_despawn(p: TaskRef) { | ||||||
|  |         // The task's future has already been dropped, we just mark it as `!SPAWNED`.
 | ||||||
|  |         let this = &*p.as_ptr().cast::<TaskStorage<F>>(); | ||||||
|  |         this.raw.state.despawn(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     unsafe fn poll(p: TaskRef) { |     unsafe fn poll(p: TaskRef) { | ||||||
|         let this = &*(p.as_ptr() as *const TaskStorage<F>); |         let this = &*p.as_ptr().cast::<TaskStorage<F>>(); | ||||||
| 
 | 
 | ||||||
|         let future = Pin::new_unchecked(this.future.as_mut()); |         let future = Pin::new_unchecked(this.future.as_mut()); | ||||||
|         let waker = waker::from_task(p); |         let waker = waker::from_task(p); | ||||||
|         let mut cx = Context::from_waker(&waker); |         let mut cx = Context::from_waker(&waker); | ||||||
|         match future.poll(&mut cx) { |         match future.poll(&mut cx) { | ||||||
|             Poll::Ready(_) => { |             Poll::Ready(_) => { | ||||||
|  |                 waker.wake_by_ref(); | ||||||
|  | 
 | ||||||
|  |                 // As the future has finished and this function will not be called
 | ||||||
|  |                 // again, we can safely drop the future here.
 | ||||||
|                 this.future.drop_in_place(); |                 this.future.drop_in_place(); | ||||||
|                 this.raw.state.despawn(); | 
 | ||||||
|  |                 // We replace the poll_fn with a despawn function, so that the task is cleaned up
 | ||||||
|  |                 // when the executor polls it next.
 | ||||||
|  |                 this.raw.poll_fn.set(Some(Self::poll_to_despawn)); | ||||||
|             } |             } | ||||||
|             Poll::Pending => {} |             Poll::Pending => {} | ||||||
|         } |         } | ||||||
|  | |||||||
| @ -81,16 +81,8 @@ impl RunQueue { | |||||||
|             // safety: there are no concurrent accesses to `next`
 |             // safety: there are no concurrent accesses to `next`
 | ||||||
|             next = unsafe { task.header().run_queue_item.next.get() }; |             next = unsafe { task.header().run_queue_item.next.get() }; | ||||||
| 
 | 
 | ||||||
|             let run_task = task.header().state.run_dequeue(); |             task.header().state.run_dequeue(); | ||||||
| 
 |  | ||||||
|             if run_task { |  | ||||||
|                 // If task is not running, ignore it. This can happen in the following scenario:
 |  | ||||||
|                 //   - Task gets dequeued, poll starts
 |  | ||||||
|                 //   - While task is being polled, it gets woken. It gets placed in the queue.
 |  | ||||||
|                 //   - Task poll finishes, returning done=true
 |  | ||||||
|                 //   - RUNNING bit is cleared, but the task is already in the queue.
 |  | ||||||
|             on_task(task); |             on_task(task); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|     } |  | ||||||
| } | } | ||||||
|  | |||||||
| @ -63,19 +63,12 @@ impl RunQueue { | |||||||
|             // If the task re-enqueues itself, the `next` pointer will get overwritten.
 |             // If the task re-enqueues itself, the `next` pointer will get overwritten.
 | ||||||
|             // Therefore, first read the next pointer, and only then process the task.
 |             // Therefore, first read the next pointer, and only then process the task.
 | ||||||
| 
 | 
 | ||||||
|             let run_task = critical_section::with(|cs| { |             critical_section::with(|cs| { | ||||||
|                 next = task.header().run_queue_item.next.borrow(cs).get(); |                 next = task.header().run_queue_item.next.borrow(cs).get(); | ||||||
|                 task.header().state.run_dequeue(cs) |                 task.header().state.run_dequeue(cs); | ||||||
|             }); |             }); | ||||||
| 
 | 
 | ||||||
|             if run_task { |  | ||||||
|                 // If task is not running, ignore it. This can happen in the following scenario:
 |  | ||||||
|                 //   - Task gets dequeued, poll starts
 |  | ||||||
|                 //   - While task is being polled, it gets woken. It gets placed in the queue.
 |  | ||||||
|                 //   - Task poll finishes, returning done=true
 |  | ||||||
|                 //   - RUNNING bit is cleared, but the task is already in the queue.
 |  | ||||||
|             on_task(task); |             on_task(task); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|     } |  | ||||||
| } | } | ||||||
|  | |||||||
| @ -52,8 +52,7 @@ impl State { | |||||||
| 
 | 
 | ||||||
|     /// Unmark the task as run-queued. Return whether the task is spawned.
 |     /// Unmark the task as run-queued. Return whether the task is spawned.
 | ||||||
|     #[inline(always)] |     #[inline(always)] | ||||||
|     pub fn run_dequeue(&self) -> bool { |     pub fn run_dequeue(&self) { | ||||||
|         let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); |         self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); | ||||||
|         state & STATE_SPAWNED != 0 |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -75,11 +75,9 @@ impl State { | |||||||
| 
 | 
 | ||||||
|     /// Unmark the task as run-queued. Return whether the task is spawned.
 |     /// Unmark the task as run-queued. Return whether the task is spawned.
 | ||||||
|     #[inline(always)] |     #[inline(always)] | ||||||
|     pub fn run_dequeue(&self) -> bool { |     pub fn run_dequeue(&self) { | ||||||
|         compiler_fence(Ordering::Release); |         compiler_fence(Ordering::Release); | ||||||
| 
 | 
 | ||||||
|         let r = self.spawned.load(Ordering::Relaxed); |  | ||||||
|         self.run_queued.store(false, Ordering::Relaxed); |         self.run_queued.store(false, Ordering::Relaxed); | ||||||
|         r |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -67,11 +67,7 @@ impl State { | |||||||
| 
 | 
 | ||||||
|     /// Unmark the task as run-queued. Return whether the task is spawned.
 |     /// Unmark the task as run-queued. Return whether the task is spawned.
 | ||||||
|     #[inline(always)] |     #[inline(always)] | ||||||
|     pub fn run_dequeue(&self) -> bool { |     pub fn run_dequeue(&self, cs: CriticalSection<'_>) { | ||||||
|         self.update(|s| { |         self.update_with_cs(cs, |s| *s &= !STATE_RUN_QUEUED) | ||||||
|             let ok = *s & STATE_SPAWNED != 0; |  | ||||||
|             *s &= !STATE_RUN_QUEUED; |  | ||||||
|             ok |  | ||||||
|         }) |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user