头像

Okamasa店老板




离线:1天前


最近来访(366)
用户头像
诚_7
用户头像
20220329
用户头像
Jettblue
用户头像
abcla
用户头像
Juicy
用户头像
Txoon
用户头像
光風霽月
用户头像
云代码
用户头像
太雪菜
用户头像
LittleBunny
用户头像
阿蒙
用户头像
A越努力越幸运
用户头像
胡图图
用户头像
kkklll
用户头像
上术
用户头像
Evil_Vice
用户头像
gangan
用户头像
无秋物语
用户头像
球球AC成功
用户头像
wlyg


仔细研读了各类经典秘籍之后,本王决定找到一只菜鸟勇者练练手.

众所周知,大陆有无穷个平行宇宙的版本,有些地方的勇者技术精湛,能将能力榨干到极限;
也有菜鸟勇者,CD超长,魔术稀烂,完全不是本王的对手。

不如趁着周中勇者都在上班,抓个找不到工作的弱鸡勇者吧!

$\color{blue}{🗡🗡🗡}\color{gold}{勇者の试炼: 归约}$

与勇者比拼,无非速度,耐力,攻击力。比如如何快速地计算1e8次攻击的总伤害,好安排下一次的攻击。

一个菜鸟勇者会不加思索地写道:

const int N = 1e8;
int main() {
    int sum = 0;
    int *a;
    get_actions(a, N); //生成N个值
    for (int i=0; i<N; i++) {
        sum += a[i];
    }
    return 0;
}

正合我意。在他挨个记数的时候,60ms已经过去。按照本王先前的精细化管理经验, 这世间足够成千上万的小兵进行攻击了。

$\color{blue}{😈😈😈}\color{gold}{魔王の挑战}$

从cuda大陆的认真笔记中,我了解到,要打败勇者,需要考虑这么几个方面:

  • 计算密度
  • 存储类型
  • 合并度

计算密度在这个基础的挑战中毫无用武之地,毕竟只是单纯的加法全局考量。

于是本王将目光转向存储类型。

存储类型决定着读取记忆的快慢。假设小妖们要谋划A+B,那脑中必定需要首先回忆起A和B。

小兵们所使用的记忆(memory), 既有$\color{blue}{【全局内存】}$, 也有$\color{blue}{【共享内存】}$

  • 全局内存:所有线程都能访问其中数据,容量很大(G级别),访问较慢。
  • 共享内存:一个块中的线程能够访问,速度较快,容量有限(KB级别)。

在这里,不同的小兵将结成阵列(block),共享一块存储,如此一来,他们回忆的速度将会加快....

所以shared memory正是本王所要的。。。

正待本王组织动手之际。。。

$\color{red}{🗡🗡🗡}\color{gold}{勇者の反击!}$

吾等勇者,行侠仗义,救NPC于危难,免其他勇者于冒险。

听闻菜鸟勇者被绑,吾等岂能坐视。

这试炼如此简单,若不好好表现,岂不辱没吾等勇者威名!

吾等十名勇者,自愿结阵,前来相助!

#include <memory>
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#include <chrono>

const long N = 1e8;
void get_actions(int *a, int n) {
    for (long i=0; i<n; i++) {
        a[i] = i % 1000;
    }
}
int main() {
    long long sum = 0;
    int *a = new int[N];
    get_actions(a, N); //生成N个值
    printf("N is %lld\n", N);
    auto t1 = std::chrono::steady_clock::now();
    for (int i=0; i<10; i++) {
        printf("a %d\n", a[i]);
    }
long i;
#pragma omp parallel for private(i)
    for (i=0; i<N; i++) {
        sum += a[i];
    }
    auto t2 = std::chrono::steady_clock::now();
    auto diff = std::chrono::duration<double>(t2 - t1);
    printf("add cost %lf ms, total sum %lld\n", diff.count() * 1000, sum);
    delete[] a;
    return 0;
}

export OMP_NUM_THREADS=10

勇者们说完一番废言废语,十人接力起来,再看耗时,已经降到了10ms。

$\color{blue}{😈😈😈}\color{gold}{魔王の外挂!}$

情知不能再拖!若是勇者们人数再增加,此次挑战将完败。魔界的魔威何在?

然而,要从头开始打造小兵,申请头脑,按前文的步骤,检查错误,考核绩效,一时半会,绝不能完成......

也不是只有勇者才有外援!本魔王也有外挂thrust,可以一战!

#include <memory>
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#include <chrono>
#include <thrust/device_vector.h>
#include <thrust/host_vector.h>


using ll = long long;
const long N = 1e8;
void get_actions(thrust::host_vector<ll> &a, int n) {
    for (long i=0; i<n; i++) {
        a[i] = i % 1000;
    }
}
int main() {
    ll sum = 0;
    thrust::host_vector<ll> a(N);
    get_actions(a, N); //生成N个值
    thrust::device_vector<ll> d_a(N);
    d_a = a;
    printf("N is %lld\n", N);
    auto t1 = std::chrono::steady_clock::now();
    sum = thrust::reduce(d_a.begin(), d_a.end(), (ll)0, thrust::plus<ll>());  
    auto t2 = std::chrono::steady_clock::now();
    auto diff = std::chrono::duration<double>(t2 - t1);
    printf("add cost %lf ms, total sum %lld\n", diff.count() * 1000, sum);
    return 0;
}

与勇者类似的篇幅里,本王一王完成了同样的题目。耗时,$\Large{\color{red}{6ms}}$。

$$\Large{🏆🏆🏆🏆🏆🏆🏆}: 魔王!$$


关于thrust的传奇,让本魔王番外来讲。




点赞链接 👈

游戏链接 👉 愤怒的小鸟微恐版

本文对这个游戏进行批评与自我批评


游戏框架

MONKEY PATCH

可以看出这是一个非常不规范的游戏,完全不是按照Y总的教程来的....

我以为的:

draw2.png

acwing要求的:

draw3.png

所以…做了一个hack,把index的内容写进了js。

至于我为什么会这么以为,因为在github上搜到了一个类似的项目,在那基础上改的。而且直接打开index.html配置很方便…

游戏组成

游戏内容主要有几个类:

  • Game: 定义游戏流程
  • Level: 定义关卡
  • Entity: 定义游戏中的物体特性

另外还有一些辅助类,帮助完成资源加载、绑定和计算。其中计算碰撞和反弹用到的是:

  • box2d: 用于计算碰撞

Game

定义了游戏的整个流程, 这点遵从原项目的设计,我没有做改动:

  • Preclude: 加载背景图和字幕
  • Intro: 关卡刚刚载入,游戏将在整个关卡范围内平移游戏画面,向玩家展现关卡中的所有东西
  • Load-next-hero: 检查是否有下一个英雄可以装填到弹弓上去,如果有,装填该英雄。如果我们的英雄耗尽了而坏蛋却没有被全部消灭,关卡就结束了
  • Wait-for-firing: 将视野移回到弹弓,等待玩家发射“英雄”。此时,游戏正在等待玩家单击英雄。在这个阶段,玩家可以也很有可能用鼠标拖拽画面,查看整个关卡
  • Firing: 在这个阶段,玩家已经单击了英雄,但还没有释放鼠标按键。此时,游戏正在等待玩家拖拽英雄,调整角度和位置释放英雄
  • Fired: 玩家释放了鼠标按键并发射英雄之后进入这个阶段。此时,游戏将所有的事情交给物理引擎来处理,用户仅仅在观看。游戏画面会随着发射出的英雄平移

Levels

Levels开始,可以发挥个人想象力了。

每个Levels定义的内容有:

  • 每关的前景图和背景图
  • 开始时候的提示语
  • 关卡内容(地图角色,英雄角色,坏蛋角色)

Entities

无论是障碍物,敌人,还是子弹,都可以被视为一个Entities。

draw5.png

每个entity会有属性,以及对应的图像;shape分两种,circle和rectangle;圆形的需要设定半径radius,矩形的需要设定width和height;isStatic表示是否要参与运动的计算(自由落体,滚动..),如果设置为false,可以设置出浮空的木板等;

对vilian障碍物还需要有health参数表示何时会消失,score表示这次能获得多少分数;另外还有一组物理参数(density密度, friction摩擦系数, restitution反弹系数),用来计算每次碰撞后的路径。

对hero,即填充发射的子弹,增加了额外的power参数,用来调整伤害值;同时删掉了score属性。

伤害计算

玩家发射时拉扯弹弓,给定初始impulse,其大小和距弹弓中心距离成正比。然后直接调用box2d的ApplyImpulse功能

var impulse = new b2Vec2((slignshotCenterX-mouse.x-game.offsetLeft)*impulseScaleFactor, (slingshotCenterY-mouse.y)*impulseScaleFactor);

hero.ApplyImpulse(impulse, WorldCenter);

碰撞事件通过注册box2d的callback来处理:

var listener = new Box2D.Dynamics.b2ContactListener();
listener.PostSolve = function(contact, impulse) {
    var body1 = contact.GetFixtureA().GetBody();
    var body2 = contact.GetFixtureB().GetBody();
    var entity1 = body1.GetUserData();
    var entity2 = body2.GetUserData();

    var impulseAlongNormal = Math.abs(impulse.normalImpulses[0]); // 法向冲撞力
    // 监听器被调用得有些太频繁了,滤去非常小的冲击
    // 尝试不同的值后,5 似乎比较好
    if (impulseAlongNormal > 5) {
    // 如果对象有生命值,用冲击值削弱生命值
    var harm = impulseAlongNormal;
    if (entity1.power) {
        harm = harm * entity1.power;
    } else if (entity2.power) {
        harm = harm * entity2.power;
    }
    if (entity1.health) {
        entity1.health -= harm;
    }

    if (entity2.health) {
        entity2.health -= harm;
    }
    ...
    // 处理其他的状态,播放音乐等
};
box2d.world.SetContactListener(listener);

过关条件及奖励

当vilian全部消失,而hero还在的话,游戏获胜;否则游戏失败。

分数比较随意。

超过特定关卡后可以解锁一些新的操作。

关卡解析

白色字体,请反选

Level 1

$\color\white{热身}$

Level 2

$\color\white{眼睛基本碰到就能消灭,一种方案是对着地板反弹过障碍物, 也可以等第四关后再来}$

Level 3

$\color\white{为第四关铺垫,瞄准最下面一排的障碍物打就行}$

Level 4

$\color\white{钢铁打不破,但是文字提示空白格。在等待发射时按空格,钢铁会变成玻璃,结构和第三关相同,这个能力可以带到之后的游戏}$

$\color\white{这时候回到第二关,可以用这个功能键把钢铁的障碍物换成玻璃的}$

Level 5

$\color\white{可以用快捷键把上方钢铁换成玻璃,然后击碎玻璃,让上面的障碍物自由落体。落体完可能还剩一些villian,需要留好子弹清场;}$
$\color\white{也可以利用上方的反弹清除一部分viliian,再使用自由落体}$




CUDA访问内存的效率将决定程序的效率。在此将介绍CUDA的几种存储和合理使用方式。

全局内存

对全局内存的访问将触发数据传输。传输的规则如下:

  • 每次传输的大小为32字节。

  • 首地址为128的整数倍。

可以用$\color{blue}{【合并度】}$这个量度来描述“被浪费的数据传输次数”。假设线程需要A Byte数据,结果因为数据对齐等原因传输了B Byte, 合并度 = A / B。

如果合并度不为1, 则称为$\color{yellow}{【非合并访问】}$。

在非合并访问不可避免的情况下,CUDA对非合并的读操作有缓存机制,所以应尽量保证写操作的合并度。


共享内存

共享内存与线程块对应,每个线程块都有自己的副本;不同线程块的值可以不同;相当于在线程块级别上的“全局内存”缓存。

共享内存大小有一定限制,可以声明固定大小或动态大小的共享内存:

void __global__ something() {
    __shared__ float s_y[128]; //在核函数内声明固定大小的共享内存
}

<<<grid_size, block_size, sizeof(real) * block_size>>> // 在核函数的执行配置指定第三个参数,给出shared大小

void __global__ something() {
    extern __shared__ float s_y[]; //在核函数内用extern修饰共享内存大小
}

不同线程对同一共享内存的访问,需要$\color{green}{__syncthreads()}来保证先后访问顺序。

例如:

for (int offset = blockDim.x >> 1; offset > 0; offset >>= 1) {
    if (tid < offset) {
        s_y[tid] += s_y[tid + offset];
    }
    __syncthreads();
}

这是一个数组reduce的例子。数组里的每个元素需要累加指定的offset值。syncthread保证每个线程都会先完成 offset=blockDim.x>>1累加。等所有线程都完成这一步后,才会累加blockDim.x >> 2;

利用共享内存可以改善全局内存的访问。可以以合并访问的形式,将数据拷贝到共享内存;然后在共享内存上进行操作,最后再以合并访问的形式写入全局内存;通过添加一层缓存,可以绕过输入和输出总会有一个非合并的问题。

另外,共享内存本身划分为多个Bank。多个线程同时访问一个Bank也将造成冲突。这也应该尽量避免。


原子函数

考虑如下代码:

if (tid == 0) {
    d_y[0] += s_y[0];
}

在每个block的Thread0里都会执行这一行,它被多个线程访问,但是访问顺序不确定。同CPU代码一样,这里需要有取出元素-相加-写入三步,会出现读写竞争。

和CPU端一样,CUDA提供了atomic操作,可以直接调用。

一个函数声明的例子如下:

T atomicAdd(T *address, T val)

address代表待累加的地址,val代表需要累加的函数值。




每个小妖都在干什么?
怎么让他们发挥出最大的骚扰勇者的潜力?
这是每一个有志之魔王需要考虑的问题。
因此,本王创建了⏲的使魔,附着在小怪物们身上。这样, 每只怪的效率也就一目了然了。


$\Large\color{blue}{【时计塔的使魔😈】}$

Cuda世界的时间流逝,在冒险大陆不可预知。
因此本王创建了使魔,指令开始之时,向cuda大陆记录时间,结束时再记录一次,一来一回,相减得到的就是耗时。
本王将其编写成了一套法阵,待我吟唱 ————

#include "utils.h"
#include <map>
/*
计时使魔:
开始计时时:调用Start;
结束计时时:调用Tick;
可以实现连续计时, 例如: 
Start();
doA;
Tick(); //得到A耗时
doB;
Tick(); //得到B耗时
*/

class TImeSerVItor {
public:
    TImeSerVItor() {
        CHECK(cudaEventCreate(&start_));
        CHECK(cudaEventCreate(&stop_));
    }
    ~TImeSerVItor() {
        CHECK(cudaEventDestroy(&start_));
        CHECK(cudaEventDestroy(&stop_));
    }
    void Start() {
        CHECK(cudaEventRecord(start_));
        cudaEventQuery(start_);
    }
    void Tick() {
        CHECK(cudaEventRecord(stop_));
        CHECK(cudaEventSynchronize(stop_));
        float elapsed_time;
        CHECK(cudaEventElapsedTime(&elapsed_time, start_, stop_));
        printf("total cost is %f\n", elapsed_time);
        CHECK(cudaEventRecord(start_));
        cudaEventQuery(start_);
    }
private:
    cudaEvent_t start_, stop_;
};


$\Large\color{pink}{【批量测试🏭】}$

有了⌚😈相助, 就能对魔兵分门别类,进行一一考核了。
以魔理揣度,每个魔兵思考计算得越多,用来对付勇者就越有效率。
若是本王千辛万苦申请来魔兵,他们单纯搬运指令,就走完了任务流程,那可就大大地浪费了。
初级魔兵需要思考,使用魔法A能造成广谱地图N个地址的伤害,魔法B会造成另外一些伤害。
那么,有多少伤害?

这个地图多大合适?每个小兵又处理多大范围合适?
为此本王设置了可变法阵,逐步记数;

/*处理相加的核函数*/
void __global__ add(double *x, double *y, double *z, const int N, const int step) {
    int idx = blockIdx.x;
    while (idx < N) {
        z[idx] = x[idx] + y[idx];
        idx += step;
    }
}
int main() {
    std::map<int, int> a;
    int sum = 0;
    int N = 10000;
    double **x, **y, **res;
    double src_x[1000000], src_y[1000000], src_z[1000000];
    for (int i=0; i<20000; i++) {
        src_x[i] = random() % 1000;
        src_y[i] = random() % 1000;
    }
    for (int step = N; step < 10*N; step += N) {
        CHECK(cudaMalloc((void **)&x, N));
        CHECK(cudaMalloc((void **)&y, N));
        CHECK(cudaMalloc((void **)&res, N));
        CHECK(cudaMemcpy((void*)x, src_x, N, ::cudaMemcpyDefault));
        CHECK(cudaMemcpy((void*)y, src_y, N, ::cudaMemcpyDefault));
        printf("dealing step N %d\n", step);
        TImeSerVItor ti;
        ti.Start();
        for (int i=0; i<N; i++) {
            src_z[i] = src_x[i] + src_y[i];
        }
        ti.Tick();
        int block_num = 1000;
        add<<< block_num, 1>>>(*x, *y, *res, N, (N / block_num) + 1);
        ti.Tick();
        CHECK(cudaFree(x));
        CHECK(cudaFree(y));
        CHECK(cudaFree(res));
        CHECK(cudaStreamSynchronize(0));
    }
}

$\Large\color{red}{【计算の王道】}$

以上的阵式只是一个例子,经过更为复杂的多轮验证,本王得到了以下结论:

  • 【算术强度】指令传输需要耗时,因此每个小兵需要尽量发挥复杂的计算效力,弥补搬运时的时间耗损
  • 【并发数量】cuda大陆下,各小兵自行其事,可以在资源足够的情况下分配更多的小兵;

另外,根据经验,往返两个大陆间的信息通道带宽,只有CUDA大陆内部的几十分之一,所以两方的数据传输需要尽可能少,哪怕是为此需要分配一些cuda大陆执行较慢的操作。

下一步,本王就该详细拆开每个使魔的构造,让他们更听话了。




见到了磅礴的妖魔海,勇者一定会被本魔王强大的力量震慑吧!

不过,当务之急,是找个安全的地方修炼。要是魔功走火入魔,威力大减,那可就大大的不好。

好在我与黑魔法之渊库达,呼吸与共,灵力相通。几番试探,我已了解黑魔法之渊种种禁忌。若有违反,轻则召唤失败,重则反噬魔王本营,再凶猛的魔王,也得谨慎行事!


$\Large\color{blue}{【召唤三诫】}$

✨ $\color{Gold}{有限魔物}:同时活动的魔物数量有限!$

✨ $\color{Gold}{契约访问}:不可访问请求之外的土地$!%

✨ $\color{Gold}{愚昧实体}:关于本大地的一切继承与依附在cuda中均不可用,它们也不支持可变的内存,在早期不能互相组合。$

三诫从魔王无数次尝试得来,要一一理解还为时过早。所有预言和箴言,在后续章节才会一一解释。


$\Large\color{pink}{【谦卑信使】}$

cuda的所有请求都不会被咆哮着拒绝。它会静默地放弃所有的不合理请求,就像渗入勇者团队的史莱姆和哥布林。

所以,作为魔王,首先要怀以谦卑的内心,为每次请求解析cuda的谜语,了解真实情况到底如何。

#define CHECK(call)

do \

{ \

const cudaError_t error_code = call;\

if (error_code != cudaSucess) \

{ \

printf("CUDA Error: \n"); \

printf(" File: %s\n", __FILE__); \

printf(" Line: %d\n", __LINE__); \

printf(" Error code: %d\n", error_code); \

printf(" Error text: %s\n", cudaGetErrorString(error_code)); \

exit(1); \

} \

} while (0)

这样,每次向cuda请法,本魔王都能得知是否触犯戒律。

譬如, 本王要把一切纳入麾下:

#include <stdio.h>
#include <stdlib.h>
#include <cuda_runtime.h>

#define CHECK(call) do \
{ \
const cudaError_t error_code = call;\
if (error_code != cudaSuccess) \
{ \
printf("CUDA Error: \n"); \
printf(" File: %s\n", __FILE__); \
printf(" Line: %d\n", __LINE__); \
printf(" Error code: %d\n", error_code); \
printf(" Error text: %s\n", cudaGetErrorString(error_code)); \
exit(1); \
} \
} while (0)

int main(void) {
    const int N = 9600000;
    const int M = 9600000;
    double *d_x;
    CHECK(cudaMalloc((void**)&d_x, 1000000000000)); //!!!!!!
    CHECK(cudaFree(d_x));
    CHECK(cudaDeviceSynchronize());
}

却遭到了冷酷无情的否定:

CUDA Error: 
 File: c.cu
 Line: 23
 Error code: 2
 Error text: out of memory

没有足够的空间容纳本王的万万亿计划,可惜!


$\Large\color{gray}{【无主之魂】}$

在cuda眼中,我的小怪不过是些无关紧要的【核函数】,像路边的顽石,滚到哪算哪,就算密语也无法解出。

此时,便需要再笼统地询问一次:


CHECK(cudaGetLastError());

CHECK(cudaDeviceSynchronize());

如果它们出事了,求求告诉本王,本王必定严加管教。

譬如我曾经沉迷回文魔法,要121 * 12321 个小怪随便计算点什么,测测它们的反应速度。

void global count() {
const int n = threadIdx.x;
auto t1 = clock();
double sum = 0;
for (int i=0; i<10000; i++) {
sum += i;
}
auto t2 = clock();
if (n % 1000 == 0) {
printf(“time cost %llu\n”, t2 - t1);
}
}

int main(void) {
count<<< 121, 12321 >>>();
CHECK(cudaGetLastError());
CHECK(cudaDeviceSynchronize());
}


结果又遭到了拒绝。

CUDA Error:
File:
Line:
Error code: 9
Error text: invalid configuration argument
```

几经尝试,我才发现12321一栏最大值只能有1024。所以我将两个参数颠倒了下。哈,看来是一些障眼法。不过,所有的障眼法都暗中标好了伤害值。这一点,本王尚未领悟。


$\Large\color{red}{【NPCの正义】}$

不过,经过本王四处游历,cuda大陆有着活跃而正义的NPC团体。它们隶属于CUDA-MEMCHECK工会,共有四名:

  • memcheck

  • racecheck

  • initcheck

  • synccheck

各位NPC各司其职。

memcheck: 检查是否访问了未申请的内存范围。

racecheck: 检查多个任务是否争抢同一个共享资源

initcheck: 检测全局资源是否初始化

synccheck: 检测多个任务是否能执行同步操作

犹如天书。

本王只需要memcheck帮忙,申请到魔法,放出属于自己的小怪就好了。



活动打卡代码 AcWing 4427. 树中节点和

#include <iostream>
#include <cstring>
#include <algorithm>

#include <vector>

int main()
{
    int n;
    std::cin >> n;

    std::vector<int> f(n);
    std::vector<long> num(n);
    std::vector<long> tot(n);
    std::vector<long> min(n);
    std::vector<int>  cnt(n);
    std::fill(cnt.begin(), cnt.end(), 0);
    std::fill(min.begin(), min.end(), 2e9);
    f[0] = 0;
    for (int i=1; i<n; i++) {
        std::cin >> f[i];
        f[i] --;
        cnt[f[i]] ++;
    }
    for (int i=0; i<n; i++) {
        std::cin >> num[i];
        if (num[i] != -1 && f[i] != i) {
            int fa = f[i];
            int ffa = f[fa];
            if (num[i] < num[ffa]) {
                printf("-1\n");
                return 0;
            }
            tot[fa] += num[i] - num[ffa];
            min[fa] = std::min(min[fa], num[i] - num[ffa]);
        }
    }
    long ans = 0;
    for (int i=0; i<n; i++) {
        if (f[i] == i) {
            ans += num[i];
            continue; //root
        }
        if (num[i] == -1 && cnt[i] != 0) {
            ans += tot[i] - min[i] * (cnt[i] - 2);
        }
    }
    printf("%ld", ans);
    return 0;
}




我是一个动作类游戏的BOSS,负责守在游戏最后的关卡,通过繁复的无穷无尽的攻击,给予勇者旅途中最大的痛苦,让他后悔来到这个地方,哭着躺着要重新来过。

距离勇者来临还有一周。我还有充足的时间准备。我必须充分地强化自己所需的技能。

看着技能表,我眼前一亮:召唤小怪。小怪大召唤术。就从这开始吧!

在旧世界里特尔(INTEL)的设定里,一个小怪召唤完才能召唤下一个小怪。小怪们的攻击也是一个个来。我能设定一些条件,令小怪无休止地自动产生,但动作的顺序不会改变。

更为恼火的是,要造出具有攻击力的,能沟通的小怪费心费力。要让他们有个复杂些的技能,譬如精准地识别勇者的位置并预测下一个轨迹,也许就耗费了数百万个里特尔时间。这样我一秒也就能攻击百十来下勇者。

好在,作为新晋的BOSS,我的世界还没完全定型。为此,我翻遍了魔族怪物族的典故,和openmp,tbb这些孱弱的并行技术几经纠缠。终于,在一本《GPU编程基础》里,我找到了想要的宝藏:库达(CUDA)

在这里,制造小兵需要三步:

  1. 用cuda语言画出想要的小兵和数量

  2. 唱NVCC技能,将小兵和制造过程做成可使用(executable)的道具

  3. 在必要的时候使用道具

我已有些迫不及待了。今天,立刻,我要听见一万个小怪对勇者喝倒彩。

第一步,画出想要的小怪,画出想要的数量


#include <stdio.h>

__global__ void booing_monster_from_gpu() {

  printf("twit you! twit you!\n");

}

int main(void) {

  booing_monster_from_gpu<<<1000, 1000>>>();

  cudaDeviceSynchronize();

  return 0;

}

这是个会对着勇者破口大骂的小怪。我起用了1000*1000=100w个。

第二步,NVCC技能。

nvcc booing_monster.cu -o booing_monster

我迫不及待按下了这个道具。勇者还有好几天才能到呢,没事,到时候再造。

于是我的世界都被怪物的咒骂声淹没了。


twit you!twit you!

twit you!twit you!

twit you!twit you!

twit you!twit you!

twit you!twit you!

twit you!twit you!

twit you!twit you!

....

不到十分钟,我中断了道具的使用。




前文们 更好的hashmap 提到快速查询的hash表。在CPU上,它可以用类似swiss-table的技术加速;再深挖技术和常数细节,提升空间已经有限;那么,还有没有别的思路?

在CPU版本的优化中,许多设计都是为了缓存友好设计出来的,比如减少间接寻址,去掉dumb-head,这足以说明缓存读取的效率至关重要;所以如果有一个读写效率更高的缓存,效率必然可以进一步提升。

因此,我们将目光移向火热的高性能计算技术:${\color{blue}{[GPU]}}$。在此,我们将讨论设备性能,并介绍一种GPU上的hash实现。


GPU部分

传输效率

计算设备与存储设备交互的效率可以用存储设备的带宽来估计。

与CPU交互的存储设备是内存。在标准工况下,带宽计算可以按频率*位数估算。目前较为主流的DDR4-3200 双通道方案,带宽约为51.2GB。

与之相比,GPU的带宽要高上一个数量级。例如,GeForce GTX 1080 Ti这样常见的型号,官方标注显存带宽为484GB。也就是说,普通GPU的带宽是

效率计算方法

存储效率高并不意味着可以取代CPU。CPU的指令执行经过漫长的演化,有着复杂的指令集,能推断复杂的执行流。

GPU的计算结构则简单非常多。它的计算单元更偏重计算,没有很好的分支预测功能。比如:

if (A) {

do_something_A;

} else {

do_something_B;

}

这个代码在GPU中通常需要等待do_something_A + do_something_B的时间。所以如果代码有许多嵌套的逻辑判断,或是代码本身就是顺序执行的,不需要多个核心的功能,那么GPU恐怕起不到多大的优化效果。


hashmap实现

实现一个hashmap,需要有存储key-value的数据结构、hash函数、以及解决冲突的方法。

在GPU的实现中,数据结构无非是一段连续内存,可以仿照STL中的设计实现。hash函数可以以核函数的形式运算,一般对整数或者字符串,可以使用murmurhash;解决冲突的方式因为涉及到下次访问的数据如何载入;为了最大限度利用内存连续性,这里使用和之前类似的线性探测方式。

GPU版STL: THRUST

针对hashmap的数据结构,nvidia提供了thrust。

thrust是Nvidia开发的基于CUDA的并行性算法库,它提供高级别的抽象编程,通过对内存获取和内存分配的优化来实现加速。

它提供类似stl的vector,algorithm,iterator的实现。algorithm中,thrust已经提供了sort,sum,minmax等常用操作 。sort的例程如下:

//区间内的数排序
#include <thrust/sort.h>

...
const int N = 6;
int A[N] = {1, 4, 2, 8, 5, 7};

thrust::sort(A, A + N);  
//按key排序
#include <thrust/sort.h>

...
const int N = 6;
int    keys[N] = {  1,   4,   2,   8,   5,   7};
char values[N] = {'a', 'b', 'c', 'd', 'e', 'f'};

thrust::sort_by_key(keys, keys + N, values);

// keys is now   {  1,   2,   4,   5,   7,   8}
// values is now {'a', 'c', 'b', 'e', 'f', 'd'} 
//使用自定义的排序函数
#include <thrust/sort.h>
#include <thrust/functional.h>

...
const int N = 6;
int A[N] = {1, 4, 2, 8, 5, 7};

thrust::stable_sort(A, A + N, thrust::greater<int>());

// A is now {8, 7, 5, 4, 2, 1}

cucollections中的hashmap

cucollection在thrust的基础上实现了hashmap。

它提供:

  • static_map,固定存储空间大小

  • dynamic_map,可根据输入调整存储空间大小

static_map提供Host 端的批量insert,search,erase,contains操作,也可以针对单个GPUthread进行查询。

dynamic_map相当于多个static_map的组合。如果插入的数据过多,使得static_map变“满”了,就再创建一个新的static_map存储。这样做的代价是,每次查询,都需要在所有已存在的static_map里查找一遍,无法提供O(1)复杂度的查找。因为dynamic_map只比static_map多了一个判断capacity扩容,以及多static_map遍历查找插入删除的操作,这里只介绍static_map,具体可以参考官方实现

HashMap构造

在cucollection中的例程里,它的构造函数如下:

// 声明

static_map(std::size_t capacity,

sentinel::empty_key<Key> empty_key_sentinel,

sentinel::empty_value<Value> empty_value_sentinel,

Allocator const& alloc = Allocator{},

cudaStream_t stream = 0);

// 调用

cuco::static_map<int, int> map{100'000,

cuco::sentinel::empty_key{empty_key_sentinel},

cuco::sentinel::empty_value{empty_value_sentinel},

cuco::cuda_allocator<char>{},

str};

与C++的map相比,这个构造参数复杂了很多。

首先,static_map需要指定hashmap的大小。这点可以key和hash表合适的load-factor倒推导出来。假设你需要插入10w个key,那么建议申请1/0.7*10w空间大小的哈希存储空间,以保证load-factor在50%左右,避免factor高时大量hash冲突带来的性能损耗。

紧接着尺寸大小传入了两个sentinel。static_map使用线性探查(probing)的方式检查hash是否冲突,sentinel用于标志当前这个hash_value的key是否结束。这两个值需要用户指定。同样的,如果需要删除,删除的节点会被static_map设置成erase_sentinel,这个值同样需要用户指定。如果static_map需要删除操作,对应的构造函数声明:

// 带删除的声明

static_map(std::size_t capacity,

sentinel::empty_key<Key> empty_key_sentinel,

sentinel::empty_value<Value> empty_value_sentinel,

sentinel::empty_value<Value> erase_value_sentinel,

Allocator const& alloc = Allocator{},

cudaStream_t stream = 0);

然后是allocator,这个函数用于cuda申请存储块大小的策略和位置。

接下来是这个任务的streamID。Cuda的程序都是异步执行的,需要有一个ID对一个任务的线程进行等待和同步。

总结一下:GPU版本初始化一个hashmap,比起CPU,需要提供:

  • 被C++实现隐藏的两种sentinel,empty和erase(如果需要的话),可以默认设置为-1
  • 与设备同步相关的参数 stream_id

API调用

thrust::device_vector<thrust::pair<int, int>> pairs(50'000);



// Create a sequence of pairs {{0,0}, {1,1}, ... {i,i}}

thrust::transform(thrust::make_counting_iterator<int>(0),

thrust::make_counting_iterator<int>(pairs.size()),

pairs.begin(),

[] __device__(auto i) { return thrust::make_pair(i, i); });



// Inserts all pairs into the map

map.insert(

pairs.begin(), pairs.end(),

cuco::detail::MurmurHash3_32<int>{},

thrust::equal_to<int>{},

str

);

static_map是基于thrust的库的数据结构实现的。这些结构与C++ STL的定义类似。每个map的item是pair[HTML_REMOVED],first代表key, second代表value。

transform的定义也与C++ STL类似,前两个参数表示需要遍历的迭代器范围,第三个参数是结果开始插入的位置,接下来是一个lambda函数,表明对每个迭代器内容需要做的操作。

构造完需要的key之后,可以使用批量操作的接口插入map。批量处理一方面可以减少设备通信的时间;另外,也可以提升操作的并行度,从而提升效率。

static_map的Host端API都以批量操作形式给出,具体支持的操作有:

  • insert
  • erase
  • contains
  • find




前言

STL在C++开发中的重要性无需多言。它设计精妙,性能卓越,功能方便。

大多数时候,它都易于使用。但利刃能开几分光,还要依赖使用者的修养。提修养,多读书。《Efficient STL》中给出了关于STL的50条建议。

这些规则分为以下七个部分:

  • $\color{blue}{【Container(容器)】}$

  • $\color{blue}{【vector\&string(列表和字符串)】}$

  • $\color{blue}{【associate container(关联型容器)】}$

  • $\color{blue}{【Iterator(迭代器)】}$

  • $\color{blue}{【Algorithms(算法)】}$

  • $\color{blue}{【Function(函数)】}$

  • $\color{blue}{【编码实践】}$

本文将介绍最后两部分的内容。希望它们提供快如闪电的性能,同时照亮使用过程中的疑惑。


Functor 仿函数

函数通常以函数指针的形式传入;如果作为Predictor(返回值为true, false) 传参给remove_if等函数,需要保证他是无状态的函数;


有些函数对象接口需要特定的typedef。

对not1, not2, bind1st, bind2nd, 要求定义argumenCtype, firscargumenCtype, second_argument_type, 和 result_type;

可以调用ptr_fun对函数进行封装, 或者构造继承自unary_function, binary_function来实现定义

//原始的operator:

list<Widget*> widgetPtrs;

//ptr_fun形式

bool isInteresting(const Widget *pw);

list<Widget*>::iterator i = find_if(widgetPtrs.begin(), widgetPtrs.end(), not1(ptr_fun(isInteresting)));

//继承unary_function形式

template <class T>

class MeetsThreshold: public std::unary_function<Widget, bool> {

private:

const T threshold;

public:

MeetsThreshold(const T& threshold);

bool operator()(const Widget&) const;

...

};

list<widget> widgets;

...


list<Widget>::reserve_iterator i1=find_if(widgets.rbegin(), widgets.rend(), not1(MeetsThreshold<int>(10)));

//继承binary_function形式

struct WidgetNameCompare:

std::binary_function<Widget, Widget, bool> {

bool operator()(const Widget& lhs, const Widget& rhs) const;

};

list<Widget>::iterator i2=find_if(widgets.begin(), widgets.end(), bind2nd(WidgetNameCompare(), w));

关于ptr_fun, mem_fun, mem_fun_ref的作用:

ptr_fun: 为函数加一些typedef

mem_fun: 如果容器里存的是类的指针,需要用mem_fun传类函数参数

vector<Widget*> wv;
for_each(wv.begin(),wv.end(),mem_fun(&Widget::print));  

mem_fun_ref: 如果容器里存的是类,需要用mem_fun_ref传类函数参数

vector<Widget> wv;

for_each(wv.begin(),wv.end(),mem_fun_ref(&Widget::print));

使用STL编程

尽量使用algorithm代替手写的循环。algorithm效率更高,可读性更好。

以下是一个例子:

deque<double>::iterator insertLocation = d.begin();

//手动循环

for (size_t i=0; i<numDoubles; ++i) {

insertLocation = d.insert(insertLocation, data[i] + 41);

++insertLocation;

}

//STL的transform方式

transform(data, data + numDoubles, inserter(d, d.begin()), bind2nd(plus<int>(), 41));

尽量使用member function,代替同名的algorithm。

std::find(set)将会遍历set,而set.find()可以利用set本身的平衡树结构,在logN时间复杂度下查询;

list的member function行为与algorithm内的同名函数不同,list的merge,remove, remove_if会移除对象;list也不能作为algorithm中sort的参数


查找算法的使用:

无序的序列:查找元素是否存在用find, 统计出现次数用count

有序序列:查找元素是否存在用binary_search; 查找元素位置, lower_bound会返回元素的位置或元素不存在时应插入的位置;equal_range会返回一组pair,表示lower_bound, upper_bound返回的值;如果两个值相等,则说明key不存在;

关联容器可以使用容器自身的find来查找

详细信息可参见下表:


使用函数对象代替函数指针作为algorithm的参数可以获得更高的效率。函数对象的operator是inline的,编译器会直接展开代码。函数则传递了函数指针,编译器无法将函数代码内联。同时,这样可以避免语法上的歧义。


使用嵌套的STL函数需要注意可读性可维护性,避免写出write-only代码。

//write-only代码示例

//删除满足vi < x && i > last_i, last_i 是最后一个大于y的位置

vector<int> v;

int x, y;

v.erase(

remove_if(

find_if(v.rbegin(), v.rend(), bind2nd(greater_equal<int>(), y)).base(), //找到last_i

v.end(),

bind2nd(less<int>(), x)), //找到小于x的位置

v.end()

);

STL头文件规律:

几乎所有容器都在名字对应的头文件里,比如使用list需要#include [HTML_REMOVED]

除了accumulation, inner_product, adjacent_difference, partial_sum,其他算法都在[HTML_REMOVED]里声明;这四个函数声明在[HTML_REMOVED]中

迭代器声明在[HTML_REMOVED]中

functor(less[HTML_REMOVED])和functor adapter(not1, not2, mem_fun) 在functional中声明


STL编译错误诊断:

string是一个typedef, 与它有关的编译错误通常包含类似basic_string[HTML_REMOVED], allocator[HTML_REMOVED] >的形式,将它替换为对应的类型编译错误会更清晰

对vector和string而言, iterator通常是指针,如果编译错误提到了指针,可能是迭代器相关的错误

一些函数有复杂的模板构造参数,可以直接把模板构造参数忽略,让报错信息更清晰

包含back_insert_iterator, front_insert_iterator, insert_iterator的错误通常是在调用对应的inert操作时出错了

如果错误信息包含binder1st, binder2nd, 则是执行对应的bind1st, bind2nd出错了

如果出现了algorithm中的exception,通常是算法的传参错误,比如iterator类型不对

如果报找不到类型,通常是头文件没有被正确引用


STL相关的网站

• The SGI STL site, http://www.sgi.com/tech/stl/.

提供完备的STL文档,同时也提供接口兼容的一些版本,比如hash_set, hash_multiset, hash_map, hash_multimap; 单向链表;存储超大字符串的容器rope; 一系列非标准的函数对象和接口;

• The STLport site, http://www.stlport.org/.

提供一个兼容20多个编译器的SGI’s STL实现版本,并且提供了debug mode

• The Boost site, http://www.boost.org/.

提供更强大的函数对象(functio object) 和关联容器;提供有可能进入C++ 标准的算法实践;




前言

STL在C++开发中的重要性无需多言。它设计精妙,性能卓越,功能方便。

大多数时候,它都易于使用。但利刃能开几分光,还要依赖使用者的修养。提修养,多读书。《Efficient STL》中给出了关于STL的50条建议。

这些规则分为以下七个部分:

  • $\color{blue}{【Container(容器)】}$

  • $\color{blue}{【vector\&string(列表和字符串)】}$

  • $\color{blue}{【associate container(关联型容器)】}$

  • $\color{blue}{【Iterator(迭代器)】}$

  • $\color{blue}{【Algorithms(算法)】}$

  • $\color{blue}{【Function(函数)】}$

  • $\color{blue}{【编码实践】}$

本文将介绍iterator, algorithm两部分的内容。希望它们提供快如闪电的性能,同时照亮使用过程中的疑惑。



迭代器

应该尽可能地使用iterator作为迭代器。

container包含四种类型的迭代器:iterator, const iterator, reverse iterator, const reverse iterator; 而insert 和 erase接口只接收container iterator;各种迭代器支持的类型转换关系如下:

iterator可被转换为另外的迭代器类型,可以方便地进行运算操作;


iterator和const iterator是两种不同的类型,需要用advance和distance进行转换。

考虑下面的代码:

typedef sometype::iterator iter;

typedef sometype::const_iterator const_iter;

const_iter ci;

iter i(ci);

iter i(const_cast(ci));

如果sometype是vector或者string, 因为iterator通常是T*的类型别名,转换能够成立;对deque,list,hash_map等类型则不能;

正确的转换方式如下:

iter i = some_instance.begin();

advance(i, distance(i, ci));

调用reverse_iterator.base()可以获得相应的iterator,它们的指针关系如图:

ri到rbegin()的offset是4,对应的base()里,i到begin()的offset也是4;

如果要进行对应位置的操作,需要计算位移:

v.erase((++ri).base()); // erase

可以用istream_iterator将文件内容拷贝到string:

ifstream inputFile("a.txt");

inputFile.unsetf(ios::skipws); //保留空格

string fileData((istream_iterator<char>(inputFile)), istream_iterator<char>());

如果只是读入字符,不需要做复杂的选项校验,可以使用istreambuf_iterator,在作者的benchmark测试中可以获得40%的性能提升:

ifstream inputFile("interestingData.txt");

string fileData( (istreambuf_iterator(inputFile)), istreambuf_iterator());

算法

算法库的目标地址需要有足够的空间,如果需要放在目标vector结尾,需要使用back_inserter,而不是使用end()

std::vector<int> results;

transform( values.begin(), values.end(), results.end(), some_func);

//错误,results.end() 没有内存地址

transform( values.begin(), values.end(), back_inserter(results.end()), some_func );

//正确

关于排序的选择:

  1. 如果需要对vector, string, deque, array 进行完成的排序,可以使用sort或者stable_sort
  2. 如果需要vector, string, deque, array的最大,次大……第N大的数,可以使用partial_sort
  3. 如果需要vector, string, deque, array的前N大的数,或者仅仅是第N大的数,而不关心顺序,可以用nth_element
  4. 如果要判断vector, string, deque, array中有哪些元素符合某个条件,可以用partition或stable_partition
  5. 如果数据是一个链表类型,partition和stable_partition仍然可以用;list::sort可以代替sort和stable_sort;如果是需要partial_sort或nth_element,需要额外的代码实现,比如把数据拷贝进vector并进行排序;

它们消耗的资源依次排列如下:

partition > stable_partition > nth_element > partial_sort > sort > stable_sort


要删除元素的话,仅用remove-like的函数是不够的,还需要加上erase


template Forwardlterator remove(Forwardlterator first, Forwardlterator last, const T& value);

remove只接受iterator,也只返回iterator,不知道具体的容器是什么,所以无法从容器里删除数据;获得iterator后需要再调用对象的erase成员函数进行删除。

remove的作用:将不需要remove的元素前移,并返回新的end位置;

正确的删除方法:

std::vector<int> v;

v.erase(remove(v.begin(), v.end(), 99), v.end()); // 删除所有值为99的元素;

list.remove合并了erase和std::remove。


如果容器里存储的是指针类型,使用remove的时候要注意是否释放资源;最好存储智能指针的类型,析构时可以自动释放资源;


要求数组有序的算法有:binary_search, lower_bound, upper_bound, equal_range, set_union, set_intersection, set_difference, set_symmetric_difference, merge, inplace_merge


使用mismatch和lexicographical_compare可以实现大小写不敏感的字符串比较函数

mismatch:参数mismatch(s1.begin(), s1.end(), s2.begin(), predictor), 它会返回第一个不匹配的pair,可以根据pair返回的iterator值判断是否相等;

lexicographical_compare: 一个strcmp的更通用的版本,如果第一个区间短于第二个区间,或者比较函数返回true,则compare函数返回true;实现自定义的compare函数即可


bool ciCharLess(char c1, char c2) {

return tolower(static_cast<unsigned char>(c1)) < tolower(static_cast<unsigned char>(c2));

}

bool ciStringCompare(const string& s1, const string& s2) {

return lexicographical_compare(s1.begin(), s1.end(), s2.begin(), s2.end(), ciCharLess);

}

另外还可以用非stl的第三方函数库:

int ciStringCompare(const string &s1, const string &s2) {

return stricmp(s1.c_str(), s2.c_str());

}

STL不提供默认的copy_if函数,一种实现如下:

template<typename InputIterator, typename OutputIterator, typename Predicate>

OutputIterator copy_if(InputIteratro begin, InputIterator end, OutputIterator destBegin, Predicate p) {

while (begin != end) {

if (p(*begin)) *destBegin++ = *begin;

++begin;

}

return destBegin;

}

可以用accumulate和for_each对一段区间求和


list<double> ld;

double sum = accumulate(ld.begin(), ld.end(), 0.0);

//注意第三个参数需要写成0.0,默认double 类型

accumulate 还可以传入自定义的range处理函数:

vector<float> vf;

float product = accumulate(vf.begin(), vf.end(), 1.0, multiplies<float>());

// 连乘;

list<Point> lp;

Point avg = accumulate(lp.begin(), lp.end(), Point(0.0), PointAverage()); //求点的质心

class PointAverage:

public: binary_function<Point, Point, Point> {

public:

PointAverage():xSum(0), ySum(0), numPoints(0) {}

const Point operator() (const Point& avgSoFar, const Point& p) {

++numPoints; xSum += p.x; ySum += p.y;

return Point(xSum / numPoints, ySum / numPoints);

private:

size_t numPoints;

double xSum;

double ySum;

}

for_each的传参和accumulate类似,但for_each主要对每个元素起作用;它的返回类型是函数对象,需要用额外的对象获取结果:


//用for_each 求点的质心

class PointAverage:

public: binary_function<Point, Point, Point> {

public:

PointAverage():xSum(0), ySum(0), numPoints(0) {}

void operator() (const Point& avgSoFar, const Point& p) {

++numPoints; xSum += p.x; ySum += p.y;

}

const Point result() const { //这里获取结果

return Point(xSum / numPoints, ySum / numPoints);

}

private:

size_t numPoints;

double xSum;

double ySum;

}

list<Point> lp;

Point avg = for_each(lp.begin(), lp.end(), PointAverage()).result(); //foreach之后计算结果