How to use the dask.highlevelgraph.HighLevelGraph.from_collections function in dask

def dask(self):
        return HighLevelGraph.from_collections(
            self._key, {self._key: self._obj}, dependencies=()
## Split output
    leaf_arrs = []
    for i, (ocd, oax, meta) in enumerate(zip(output_coredimss, output_axes, metas)):
        core_output_shape = tuple(core_shapes[d] for d in ocd)
        core_chunkinds = len(ocd) * (0,)
        output_shape = loop_output_shape + core_output_shape
        output_chunks = loop_output_chunks + core_output_shape
        leaf_name = "%s_%d-%s" % (name, i, token)
        leaf_dsk = {
            + key[1:]
            + core_chunkinds: ((getitem, key, i) if nout else key)
            for key in keys
        graph = HighLevelGraph.from_collections(leaf_name, leaf_dsk, dependencies=[tmp])
        meta = meta_from_array(meta, len(output_shape))
        leaf_arr = Array(
            graph, leaf_name, chunks=output_chunks, shape=output_shape, meta=meta

        ### Axes:
        if keepdims:
            slices = len(leaf_arr.shape) * (slice(None),) + len(oax) * (np.newaxis,)
            leaf_arr = leaf_arr[slices]

        tidcs = [None] * len(leaf_arr.shape)
        for i, oa in zip(range(-len(oax), 0), oax):
            tidcs[oa] = i
        j = 0
        for i in range(len(tidcs)):
            if tidcs[i] is None:
    # Barrier
    barrier_token = "barrier-" + always_new_token
    dsk3 = {barrier_token: (barrier, list(dsk2))}

    # Collect groups
    name = "shuffle-collect-" + token
    dsk4 = {
        (name, i): (collect, p, i, df._meta, barrier_token) for i in range(npartitions)

    divisions = (None,) * (npartitions + 1)

    layer = toolz.merge(dsk1, dsk2, dsk3, dsk4)
    graph = HighLevelGraph.from_collections(name, layer, dependencies=dependencies)

    return DataFrame(graph, name, df._meta, divisions)
token = tokenize(self, k, npartitions)
        name = "take-" + token

        if npartitions > 1:
            name_p = "take-partial-" + token

            dsk = {}
            for i in range(npartitions):
                dsk[(name_p, i)] = (list, (take, k, (, i)))

            concat = (toolz.concat, ([(name_p, i) for i in range(npartitions)]))
            dsk[(name, 0)] = (safe_take, k, concat, warn)
            dsk = {(name, 0): (safe_take, k, (, 0), warn)}

        graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
        b = Bag(graph, name, 1)

        if compute:
            return tuple(b.compute())
            return b
s : (min(M, N),) Array
        Singular values of `a`.
    q, r = qr(a)
    x = solve_triangular(r,
    residuals = b -
    residuals = (residuals ** 2).sum(keepdims=True)

    token = tokenize(a, b)

    # r must be a triangular with single block

    # rank
    rname = "lstsq-rank-" + token
    rdsk = {(rname,): (np.linalg.matrix_rank, (, 0, 0))}
    graph = HighLevelGraph.from_collections(rname, rdsk, dependencies=[r])
    # rank must be an integer
    rank = Array(graph, rname, shape=(), chunks=(), dtype=int)

    # singular
    sname = "lstsq-singular-" + token
    rt = r.T
    sdsk = {
        (sname, 0): (
            (np.sqrt, (np.linalg.eigvals, (, (, 0, 0), (, 0, 0)))),
    graph = HighLevelGraph.from_collections(sname, sdsk, dependencies=[rt])
    _, _, _, ss = np.linalg.lstsq(
        np.array([[1, 0], [1, 2]], dtype=a.dtype),
        np.array([0, 1], dtype=b.dtype),
            left_depth = depth
            right_depth = depth

        if len(bds) == 1:
            left = [bds[0] + right_depth]
            right = [bds[-1] + left_depth]
            mid = []
            for bd in bds[1:-1]:
                mid.append(bd + left_depth + right_depth)
            chunks.append(left + mid + right)

    dsk = merge(interior_slices, overlap_blocks)
    graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x])

    return Array(graph, name, chunks, meta=x)
else o
                for o in out_parts
            + (return_inverse,)
    out_dtype = [("values", ar.dtype)]
    if return_index:
        out_dtype.append(("indices", np.intp))
    if return_inverse:
        out_dtype.append(("inverse", np.intp))
    if return_counts:
        out_dtype.append(("counts", np.intp))

    dependencies = [o for o in out_parts if hasattr(o, "__dask_keys__")]
    graph = HighLevelGraph.from_collections(name, dsk, dependencies=dependencies)
    chunks = ((np.nan,),)
    out = Array(graph, name, chunks, out_dtype)

    # Split out all results to return to the user.

    result = [out["values"]]
    if return_index:
    if return_inverse:
        # Using the returned unique values and arange of unknown length, find
        # each value matching a unique value and replace it with its
        # corresponding index or `0`. There should be only one entry for this
        # index in axis `1` (the one of unknown length). Reduce axis `1`
        # through summing to get an array with known dimensionality and the
        # mapping of the original values.
        mtches = (ar[:, None] == out["values"][None, :]).astype(np.intp)
>>> b.starmap(myadd, z=max_second).compute()
        [13, 17, 21, 25, 29]
        name = "{0}-{1}".format(
            funcname(func), tokenize(self, func, "starmap", **kwargs)
        dependencies = [self]
        if kwargs:
            kwargs, collections = unpack_scalar_dask_kwargs(kwargs)

        dsk = {
            (name, i): (reify, (starmap_chunk, func, (, i), kwargs))
            for i in range(self.npartitions)
        graph = HighLevelGraph.from_collections(name, dsk, dependencies=dependencies)
        return type(self)(graph, name, self.npartitions)
                    (name_cum, i - 1),
                    (cumlast._name, i - 1),
            dask[(name, i)] = (
                (cumpart_ext._name, i),
                (name_cum, i),
                0 if columns is None else columns,
        graph = HighLevelGraph.from_collections(
            name, dask, dependencies=[cumpart_raw, cumpart_ext, cumlast]
        return new_dd_object(graph, name, chunk(self._meta), self.obj.divisions)