Scheduler是如何运作的

date
Mar 8, 2022
slug
react-scheduler
status
Published
tags
React
源码
summary
从源码解析Scheduler运作原理
type
Post
本文基于React17.0.0
在React中,scheduler实现了任务中断和时间分片,管理着任务的调度。

调度的核心

// 调度相关
export let requestHostCallback; // 请求及时回调: port.postMessage
export let cancelHostCallback; // 取消及时回调: scheduledHostCallback = null
export let requestHostTimeout; // 请求延时回调: setTimeout
export let cancelHostTimeout; // 取消延时回调: cancelTimeout
// 时间切片相关
export let shouldYieldToHost; // 是否让出主线程(currentTime >= deadline && needsPaint): 让浏览器能够执行更高优先级的任务(如ui绘制, 用户输入等)
export let requestPaint; // 请求绘制: 设置 needsPaint = true
export let getCurrentTime; // 获取当前时间
export let forceFrameRate; // 强制设置 yieldInterval (让出主线程的周期). 这个函数虽然存在, 但是从源码来看, 几乎没有用到
SchedulerHostConfig.default.js中,会根据是否是浏览器环境对任务调度的实现方式做不同处理(MessageChannelsetTimeout
if (
  typeof window === 'undefined' ||
  typeof MessageChannel !== 'function'
) {
  // ...
} else {
	// ...
}

调度相关

performWorkUntilDeadline
在被请求回调(requestHostCallback)后,scheduledHostCallback会被赋值为传入的callback,接着会触发MessageChannelonmessage,执行performWorkUntilDeadline,最终执行scheduledHostCallback
  1. 获取当前时间
  1. 设置这个任务的deadline
  1. 执行回调,判断是否还有剩余任务
  1. 如果没有,退出;有的话通过port.postMessage发起新的调度
  1. 重置needPaint=false
/**
   * performWorkUntilDeadline内部会执行掉scheduledHostCallback,最后taskQueue被清空,
   * 这个过程中会涉及任务的中断和恢复、任务完成状态的判断
  */
  const performWorkUntilDeadline = () => {

    /**
     * requestHostCallback接收的callback会赋值给scheduledHostCallback,
     * 之后通过port1.postMessage触发,port2监听到执行回调performWorkUntilDeadline
     */
    if (scheduledHostCallback !== null) {
      const currentTime = getCurrentTime();
      // 计算deadline,deadline会参与到shouldYieldToHost(根据时间片去限制任务执行)的计算中
      deadline = currentTime + yieldInterval;
      // hasTimeRemaining表示任务是否还有剩余时间,它和时间片一起限制任务的执行。如果没有时间,
      // 或者任务的执行时间超出时间片限制了,那么中断任务。它的默认为true,表示一直有剩余时间
      // 因为MessageChannel的port在postMessage,是比setTimeout还靠前执行的宏任务,这意味着
      // 在这一帧开始时,总是会有剩余时间,所以现在中断任务只看时间片的了
      const hasTimeRemaining = true;
      try {
        // scheduledHostCallback去执行任务的函数,当任务因为时间片被打断时,它会返回true,表示
        // 还有任务,所以会再让scheduler调度一个执行者继续执行任务
        // scheduledHostCallback 由requestHostCallback 赋值为flushWork
        const hasMoreWork = scheduledHostCallback(
          hasTimeRemaining,
          currentTime,
        );
        if (!hasMoreWork) {
          // 如果没有任务了,停止调度
          isMessageLoopRunning = false;
          scheduledHostCallback = null;
        } else {
          // 如果还有任务,继续让scheduler调度执行者

          port.postMessage(null);
        }
      } catch (error) {

        port.postMessage(null);
        throw error;
      }
    } else {
      isMessageLoopRunning = false;
    }
    needsPaint = false;
  };

  const channel = new MessageChannel();
  const port = channel.port2;
  channel.port1.onmessage = performWorkUntilDeadline;

  requestHostCallback = function(callback) {

    scheduledHostCallback = callback;
    /**
     * performWorkUntilDeadline里面将isMessageLoopRunning设为false有两处:
     * 1.判断scheduledHostCallback为null
     * 2.或者判断scheduledHostCallback不为null,执行scheduledHostCallback返回结果为null
     */
    if (!isMessageLoopRunning) {
      isMessageLoopRunning = true;
      port.postMessage(null);
    }
  };

  cancelHostCallback = function() {
    scheduledHostCallback = null;
  };

  requestHostTimeout = function(callback, ms) {
    taskTimeoutID = setTimeout(() => {
      callback(getCurrentTime());
    }, ms);
  };

  cancelHostTimeout = function() {
    clearTimeout(taskTimeoutID);
    taskTimeoutID = -1;
  };

时间分片相关

// performce.now是否是函数
const hasPerformanceNow =
  typeof performance === 'object' && typeof performance.now === 'function';

// 获取当前的时间
if (hasPerformanceNow) {
  const localPerformance = performance;
  getCurrentTime = () => localPerformance.now();
} else {
  const localDate = Date;
  const initialTime = localDate.now();
  getCurrentTime = () => localDate.now() - initialTime;
}
// ...

  let isMessageLoopRunning = false;
  let scheduledHostCallback = null;
  let taskTimeoutID = -1;

  let yieldInterval = 5;
  let deadline = 0;

  const maxYieldInterval = 300;
  let needsPaint = false;

  if (
    enableIsInputPending &&
    navigator !== undefined &&
    navigator.scheduling !== undefined &&
    navigator.scheduling.isInputPending !== undefined
  ) {
    const scheduling = navigator.scheduling;
    // 是否让出主线程
    shouldYieldToHost = function() {
      const currentTime = getCurrentTime();
      if (currentTime >= deadline) {
        if (needsPaint || scheduling.isInputPending()) {
          // There is either a pending paint or a pending input.
          return true;
        }
        return currentTime >= maxYieldInterval;
      } else {
        // There's still time left in the frame.
        return false;
      }
    };

    // 请求绘制
    requestPaint = function() {
      needsPaint = true;
    };
  } else {
    shouldYieldToHost = function() {
      return getCurrentTime() >= deadline;
    };

    requestPaint = function() {};
  }
  // 根据当前屏幕的刷新率,修改yieldInterval
  forceFrameRate = function(fps) {
    if (fps < 0 || fps > 125) {
      // Using console['error'] to evade Babel and ESLint
      console['error'](
        'forceFrameRate takes a positive int between 0 and 125, ' +
          'forcing frame rates higher than 125 fps is not supported',
      );
      return;
    }
    if (fps > 0) {
      yieldInterval = Math.floor(1000 / fps);
    } else {
      // reset the framerate
      yieldInterval = 5;
    }
  };

scheduler的任务队列

taskQueuetimerQueue以小顶堆的数据结构存储
// Tasks are stored on a min heap
var taskQueue = [];
// 用于存放过期任务
var timerQueue = [];
SchedulerMinHeap.js
type Heap = Array<Node>;
type Node = {
  id: number,
  sortIndex: number,
};

export function push(heap: Heap, node: Node): void {
  const index = heap.length;
  heap.push(node);
  siftUp(heap, node, index);
}

export function peek(heap: Heap): Node | null {
  const first = heap[0];
  return first === undefined ? null : first;
}

export function pop(heap: Heap): Node | null {
  const first = heap[0];
  if (first !== undefined) {
    const last = heap.pop();
    if (last !== first) {
      heap[0] = last;
      siftDown(heap, last, 0);
    }
    return first;
  } else {
    return null;
  }
}

function siftUp(heap, node, i) {
  let index = i;
  while (true) {
    const parentIndex = (index - 1) >>> 1;
    const parent = heap[parentIndex];
    if (parent !== undefined && compare(parent, node) > 0) {
      // The parent is larger. Swap positions.
      heap[parentIndex] = node;
      heap[index] = parent;
      index = parentIndex;
    } else {
      // The parent is smaller. Exit.
      return;
    }
  }
}

function siftDown(heap, node, i) {
  let index = i;
  const length = heap.length;
  while (index < length) {
    const leftIndex = (index + 1) * 2 - 1;
    const left = heap[leftIndex];
    const rightIndex = leftIndex + 1;
    const right = heap[rightIndex];

    // If the left or right node is smaller, swap with the smaller of those.
    if (left !== undefined && compare(left, node) < 0) {
      if (right !== undefined && compare(right, left) < 0) {
        heap[index] = right;
        heap[rightIndex] = node;
        index = rightIndex;
      } else {
        heap[index] = left;
        heap[leftIndex] = node;
        index = leftIndex;
      }
    } else if (right !== undefined && compare(right, node) < 0) {
      heap[index] = right;
      heap[rightIndex] = node;
      index = rightIndex;
    } else {
      // Neither child is smaller. Exit.
      return;
    }
  }
}

function compare(a, b) {
  // Compare sort index first, then task id.
  const diff = a.sortIndex - b.sortIndex;
  return diff !== 0 ? diff : a.id - b.id;
}
scheduleCallback:会根据reconciler调用时传入的优先级和callback,返回新创建的task
function unstable_scheduleCallback(priorityLevel, callback, options) {

  // 获取当前时间,它是计算任务开始时间、过期时间和判断任务是否过期的依据
  var currentTime = getCurrentTime();
  // 确定当前时间 startTime 和延迟更新时间 timeout
  var startTime;
  // 从options中尝试获取delay,也就是推迟时间
  if (typeof options === 'object' && options !== null) {
    var delay = options.delay;
    if (typeof delay === 'number' && delay > 0) {
      // 如果有delay,那么任务开始时间就是当前时间加上delay
      startTime = currentTime + delay;
    } else {
      // 没有delay,任务开始时间就是当前时间,也就是任务需要立刻开始
      startTime = currentTime;
    }
  } else {
    startTime = currentTime;
  }
  // 将传入的优先级,转换成过期时间间隔
  var timeout;
  switch (priorityLevel) {
    case ImmediatePriority:
      timeout = IMMEDIATE_PRIORITY_TIMEOUT; // -1
      break;
    case UserBlockingPriority:
      timeout = USER_BLOCKING_PRIORITY_TIMEOUT; // 250
      break;
    case IdlePriority:
      timeout = IDLE_PRIORITY_TIMEOUT; // 1073741823 ms
      break;
    case LowPriority:
      timeout = LOW_PRIORITY_TIMEOUT; // 10000
      break;
    case NormalPriority:
    default:
      timeout = NORMAL_PRIORITY_TIMEOUT; // 5000
      break;
  }
  // 计算任务的过期时间,任务开始时间 + timeout
  // 若是立即执行的优先级(ImmediatePriority),
  // 它的过期时间是startTime - 1比当前时间还短,表示已经过期,需要立即被执行
  var expirationTime = startTime + timeout;
  // 创建调度任务
  var newTask = {
    id: taskIdCounter++,
    // 任务本体
    callback,
    // 任务优先级
    priorityLevel,
    // 任务开始的时间,表示任务何时才能执行
    startTime,
    // expirationTime: task的过期时间, 优先级越高 expirationTime = startTime + timeout 越小
    expirationTime,
    // 在小顶堆队列中的排序,越小的越前
    sortIndex: -1,
  };
  if (enableProfiling) {
    newTask.isQueued = false;
  }
  // 如果是延迟任务则将 newTask 放入延迟调度队列(timerQueue)并执行 requestHostTimeout
  // 如果是正常任务则将 newTask 放入正常调度队列(taskQueue)并执行 requestHostCallback

  // 如果任务未过期,则将 newTask 放入timerQueue, 调用requestHostTimeout,
  // 目的是在timerQueue中检查排在最前面的任务的开始时间是否过期,
  // 过期则立刻将任务加入taskQueue,开始调度
  if (startTime > currentTime) {
    // This is a delayed task.
    newTask.sortIndex = startTime;
    push(timerQueue, newTask);
    if (peek(taskQueue) === null && newTask === peek(timerQueue)) {
      // 如果现在taskQueue中没有任务,并且当前的任务是timerQueue中的第一个
      // 那么调用requestHostTimeout,检查timerQueue中是否有要放到taskQueue中的任务
      if (isHostTimeoutScheduled) {
        cancelHostTimeout();
      } else {
        isHostTimeoutScheduled = true;
      }
      // 会把handleTimeout放到setTimeout里,在startTime - currentTime时间之后执行
      // 待会再调度
      requestHostTimeout(handleTimeout, startTime - currentTime);
    }
  } else {
    // 任务已经过期,以过期时间作为taskQueue排序的依据
    newTask.sortIndex = expirationTime;
    // taskQueue是最小堆,而堆内又是根据sortIndex(也就是expirationTime)进行排序的。
    // 可以保证优先级最高(expirationTime最小)的任务排在前面被优先处理。
    // 将新任务加入任务队列
    push(taskQueue, newTask);
    if (enableProfiling) {
      markTaskStart(newTask, currentTime);
      newTask.isQueued = true;
    }
    // 调度一个主线程回调,如果已经执行了一个任务,等到下一次交还执行权的时候再执行回调。
    // 立即调度
    if (!isHostCallbackScheduled && !isPerformingWork) {
      // isPerformingWork在flushWork一开始设置为true,调用完后设置为false
      isHostCallbackScheduled = true;
      /**
       * requestHostCallback里面会将scheduledHostCallback设置为flushWork
       * 这里会通过postMessage调度一个任务,port.onMessage在下一个事件循环开始执行回调performWorkUntilDeadline,
       * 里面会执行flushWork,使用flushWork去执行taskQueue
       */
      requestHostCallback(flushWork);
    }
  }

  return newTask;
}
flushWork最终会调用任务循环workLoop,并在finally中还原一些全局标记
function flushWork(hasTimeRemaining, initialTime) {

  if (enableProfiling) {
    markSchedulerUnsuspended(initialTime);
  }

  isHostCallbackScheduled = false;
  if (isHostTimeoutScheduled) {
    isHostTimeoutScheduled = false;
    cancelHostTimeout();
  }

  isPerformingWork = true;
  const previousPriorityLevel = currentPriorityLevel;
  try {
    if (enableProfiling) {
      try {
        // workLoop的调用使得任务最终被执行
        return workLoop(hasTimeRemaining, initialTime);
      } catch (error) {
        if (currentTask !== null) {
          const currentTime = getCurrentTime();
          markTaskErrored(currentTask, currentTime);
          currentTask.isQueued = false;
        }
        throw error;
      }
    } else {
      return workLoop(hasTimeRemaining, initialTime);
    }
  } finally {
    // 运行完后会还原全局标记
    currentTask = null;
    currentPriorityLevel = previousPriorityLevel;
    isPerformingWork = false;
    if (enableProfiling) {
      const currentTime = getCurrentTime();
      markSchedulerSuspended(currentTime);
    }
  }
}
最后来看workLoop
/** workLoop是通过判断任务函数的返回值去识别任务的完成状态的,true表示完成,false没完成 */
function workLoop(hasTimeRemaining, initialTime) {

  let currentTime = initialTime;
  // 开始执行前检查一下timerQueue中的过期任务,放到taskQueue中
  advanceTimers(currentTime);
  // 获取taskQueue中最优先的任务
  currentTask = peek(taskQueue);
  // 循环taskQueue,执行任务
  while (currentTask !== null) {
    if (
      currentTask.expirationTime > currentTime &&
      (!hasTimeRemaining || shouldYieldToHost())
    ) {
      // hasTimeRemaining一直为true,这与MessageChannel作为宏任务的执行时机有关
      // 当前任务没有过期(urrentTask.expirationTime > currentTime),但是已经到了时间片的末尾,需要中断循环
      // 使得本次循环下面currentTask执行的逻辑都不能被执行到(此处是中断任务的关键)
      break;
    }
    // 执行任务 ---------------------------------------------------
    // 获取任务的执行函数,这个callback就是React传给Scheduler的任务。例如:performConcurrentWorkOnRoot
    const callback = currentTask.callback;
    if (typeof callback === 'function') {
      // 如果执行函数为function,说明还有任务可做,调用它
      currentTask.callback = null;
      // 获取任务的优先级
      currentPriorityLevel = currentTask.priorityLevel;
      // 任务是否过期
      const didUserCallbackTimeout = currentTask.expirationTime <= currentTime;
      markTaskRun(currentTask, currentTime);
      // 获取任务函数的执行结果
      const continuationCallback = callback(didUserCallbackTimeout);
      currentTime = getCurrentTime();
      if (typeof continuationCallback === 'function') {
        // 检查callback的执行结果返回的是不是函数,如果返回的是函数,则将这个函数作为当前任务新的回调。
        // concurrent模式下,callback是performConcurrentWorkOnRoot,其内部根据当前调度的任务
        // 是否相同,来决定是否返回自身,如果相同,则说明还有任务没做完,返回自身,其作为新的callback
        // 被放到当前的task上。while循环完成一次之后,检查shouldYieldToHost,如果需要让出执行权,
        // 则中断循环,走到下方,判断currentTask不为null,返回true,说明还有任务,回到performWorkUntilDeadline
        // 中,判断还有任务,继续port.postMessage(null),调用监听函数performWorkUntilDeadline,继续执行任务
        currentTask.callback = continuationCallback;
        markTaskYield(currentTask, currentTime);
      } else {
        // 返回不是函数,说明当前任务执行完了
        if (enableProfiling) {
          markTaskCompleted(currentTask, currentTime);
          currentTask.isQueued = false;
        }
        if (currentTask === peek(taskQueue)) {
          // 将当前任务清除
          pop(taskQueue);
        }
      }
      advanceTimers(currentTime);
    } else {
      // 任务的callback不是函数,说明被取消了(unstable_cancelCallback(task)会将任务的callback置为nul),则弹出
      pop(taskQueue);
    }
    // 从taskQueue中继续获取任务,如果上一个任务未完成,那么它将不会
    // 被从队列剔除,所以获取到的currentTask还是上一个任务,会继续去执行它
    currentTask = peek(taskQueue);
  }
  // return 的结果会作为 performWorkUntilDeadline 中hasMoreWork的依据
  // 高优先级任务完成后,currentTask.callback为null,任务从taskQueue中删除,此时队列中还有低优先级任务,
  // currentTask = peek(taskQueue)  currentTask不为空,说明还有任务,继续postMessage执行workLoop,
  // 但它被取消过,导致currentTask.callback为null
  // 所以会被删除,此时的taskQueue为空,低优先级的任务重新调度,加入taskQueue
  if (currentTask !== null) {
    // 如果currentTask不为空,说明是时间片不够用,导致了任务中断,只是被中止了,
    // return 一个 true告诉外部,此时任务还未执行完,还有任务,
    return true;
  } else {
    // 如果currentTask为空,说明taskQueue队列中的任务已经都
    // 执行完了,然后从timerQueue中找任务,调用requestHostTimeout
    // 去把task放到taskQueue中,到时会再次发起调度,但是这次,
    // 会先return false,告诉外部当前的taskQueue已经清空,
    // 先停止执行任务,也就是终止任务调度
    const firstTimer = peek(timerQueue);
    if (firstTimer !== null) {
      requestHostTimeout(handleTimeout, firstTimer.startTime - currentTime);
    }
    return false;
  }
}

© kaba 2019 - 2023