Cache 常见问题

去年的时候在做系统性能优化的工作中,花费了大量的精力为业务定制化缓存方案,当时感觉尽善尽美了,但前些天不经意再聊起缓存时发现在一些细节上还欠考虑。在这里总结一下做 cache 需要考虑的问题。

大纲如下:

  • 缓存模式(Cache Pattern)
  • 缓存置换(Cache Replacement)
  • 缓存穿透(Cache Penetration)
  • 缓存雪崩(Cache Avalanche)

缓存模式/Cache Pattern

比较常见的模式有分为两大类: Cache-aside 以及 Cache-as-SoR。其中 Cache-as-SoR(System of Record, 即直接存储数据的DB) 又包括 Read-through、Write-through、Write-behind。

Cache-aside

Cache-aside 是比较通用的缓存模式,在这种模式,读数据的流程可以概括:

  1. 读 cache,如果 cache 存在,直接返回。如果不存在,则执行2
  2. 读 SoR,然后更新 cache,返回
    代码如下:
    1
    2
    3
    4
    5
    6
    7
    # 读 v1
    def get(key):
    value = cache.get(key)
    if value is None:
    value = db.get(key)
    cache.set(key, value)
    return value

写数的流程为:

  1. 写 SoR
  2. 写 cache
    代码如下:
    1
    2
    3
    4
    # 写 v1
    def set(key, value):
    db.set(key, value)
    cache.set(key, value)

逻辑看似很简单,但是如果在高并发的分布式场景下,其实还有很多惊喜的。

Cache-as-SoR

在 Cache-aside 模式下,cache 的维护逻辑要业务端自己实现和维护,而 Cache-as-SoR 则是将 cache 的逻辑放在存储端,即 SoR + cache 对于业务调用方而言是透明的一个整体,业务无须关心实现细节,只需 get/set 即可。Cache-as-SoR 模式常见的有 Read Through、Write Through、Write Behind。

  • Read Through: 发生读操作时,查询 cache,如果 Miss,则由 cache 查询 SoR 并更新,下次访问 cache 即可直接访问(即在存储端实现 cacha-aside)
  • Write Through:发生写操作时,查询 cache,如果 Hit,则更新 cache,然后交由 cache model 去更新 SoR
  • Write Behind:发生写操作时,不立即更新 SoR,只更新缓存,然后立即返回,同时异步的更新 SoR(最终一致)

Read/Write Through 模式比较好理解,就是同步的更新 cache 和 SoR,读取得场景也是 cache 优先,miss 后才读 SoR。 这类模式主要意义在意缓解读操作的场景下 SoR 的压力以及提升整体响应速度,对写操作并没有什么优化,适用于读多写少的场景。Write Behind 的的 cache 和 SoR 的更新是异步,可以在异步的时候通过 batch、merge 的方式优化写操作,所以能提升写操作的性能。

下面两图是取自 wikipedia 的 Write Through 和 Write Behind 的流程图:
Write Through 和 Write Behind

小结

当前很多 DB 都自带基于内存的 cache ,能更快的响应请求,比如 Hbase 以 Block 为单位的 cache,mongo 的高性能也一定程度依托于其占用大量的系统内存做 cache 。不过在程序本地再做一层 local cache 效果会更加明显,省去了大量的网络I/O,会使系统的处理延时大幅提升,同时降低下游 cache + db 的压力。

缓存置换/ Cache Replacement

缓存淘汰算是比较老的一个话题,常用的缓存策略也就那么几个,比如 FIFO、LFU、LRU。而且 LRU 算是缓存淘汰策略的标配了,当然在根据不同的的业务场景,也可能其他策略更合适。

FIFO 的淘汰策略通常使用 Queue + Dict, 毕竟 Queue 先天就是 FIFO 的,新的缓存对象放在队尾,而当队列满时将队首的对象出队过期。

LFU (Least Frequently Used)的核心思想是最近最少被使用的数据最先被淘汰,即统计每个对象的使用次数,当需要淘汰时,选择被使用次数最少的淘汰。所以通常基于最小堆 + Dict 实现 LFU。因为最小堆每次变化的复杂度为 O(logn),所以LFU算法效率为 O(logn),相比 FIFO、LRU O(1) 的效率略低。

LRU(Least recently Used),基于局部性原理,即如果数据最近被使用,那么它在未来也极有可能被使用,反之,如果数据很久未使用,那么未来被使用的概率也较低。

LRU 过期通常使用双端链表 + Dict
实现(在生产环境使用链表一般都是双链表),将最近被访问的数据从原位置移动到链表首部,这样在链首位置的数据都是最近被使用过的,而链尾都是最久未使用的,在 O(1) 的时间复杂度内即可找到要被删除的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# LRU 缓存过期概要逻辑, 无锁版
data_dict = dict()
link = DoubleLink() # 双端队列
def get(key):
node = data_dict.get(key)
if node is not None:
link.MoveToFront(node)
return node
def add(key, value):
link.PushFront(Node(key,value))
if link.size()>max_size:
node = link.back()
del(data_dict[node.key])
link.remove_back()

Ps:

  1. py3 functools 中 lru_cache 的实现
  2. golang 实现 lru cache

缓存穿透/Penetration

当请求访问的数据是一条并不存在的数据时,一般这种不存在的数据是不会写入 cache,所以访问这种数据的请求都会直接落地到下游 SoR,当这种请求量很大时,同样会给下游 db 带来风险。

解决方法:

  1. 可以考虑适当的缓存这种数据一小段时间,将这种空数据缓存为一段特殊的值。

  2. 另一种更严谨的做法是使用 BloomFilter, BloomFilter 的特点在检测 key 是否存在时,不会漏报(BloomFilter 不存在时,一定不存在),但有可能误报(BloomFilter 存在时,有可能不存在)。Hbase 内部即使用 BloomFilter 来快速查找不存在的行。

基于 BloomFilter 的预防穿透:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 读 v3
r = redis.StrictRedis()
def get(key, retry=3):
def _get(k):
value = cache.get(k)
if value is None:
if not Bloomfilter.get(k):
# cache miss 时先查 Bloomfilter
# Bloomfilter 需要在 Db 写时同步事务更新
return None, true
if r.set(k,1,ex=1,nx=true):
value = db.get(k)
cache.set(k, value)
return true, value
else:
return None, false
else:
return value, true
while retry:
value, flag = _get(key)
if flag == True:
return value
time.sleep(1)
retry -= 1
raise Exception("获取失败")

以上两种通用的方法对于不同的场景,各有优劣。缓存空数据的方案,在应对恶意的穿透攻击时效果会很差,因为恶意请求的一般特点是 key 随机伪造,请求量巨大,所以此时缓存空数据的方案会大量缓存无意义的数据,这些数据往往不会被二次访问,因此对恶意攻击不会产生任何效果,此时使用 bloomfilter 的方案更优。相反的,如果不存的 key 是可预估的,或者有限的的,比如是通过一定规则生成的,难以伪造,此时缓存空数据的方案较优。

缓存雪崩/Avalanches

雪崩,即在某些场景下,缓存宕机、失效、过期等等因素会导致大量的请求直接落到下游的 DB,对 DB 造成极大的压力,甚至一波打死 DB 业务挂掉。

不同场景下,导致雪崩的原因各不相同。

Case 1

在高并发场景下(比如秒杀),如果某一时间一个 key 失效了,但同时又有大量的请求访问这个 key,此时会发生大量的 cache miss,可能引发雪崩。

这种情况下比较通用的保护下游的方法是通过互斥锁访问下游 DB,获得锁的线程/进程负责读取 DB 并更新 cache,而其他 acquire lock 失败的进程则重试整个 get的逻辑。

以 redis 的 set 方法实现此逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 读 v2
r = redis.StrictRedis()
def get(key, retry=3):
def _get(k):
value = cache.get(k)
if value is None:
if r.set(k,1,ex=1,nx=true): # 加锁
value = db.get(k)
cache.set(k, value)
return true, value
else:
return None, false
else:
return value, true
while retry:
value, flag = _get(key)
if flag == True:
return value
time.sleep(1) # 获取锁失败,sleep 后重新访问
retry -= 1
raise Exception("获取失败")

Case 2

大量 key 同时过期,导致大量 cache miss。冷启动或流量突增等都有可能导致在极短时间内有大量的数据写入缓存,如果它们的过期时间相同,则很可能在相似的时间内过期。

解决方法:

  1. 一个比较简单的方法是随机过期,即每条 data 的过期时间可以设置为 expire + random

  2. 另一个比较好的方案是可以做一个二级缓存,比如之前做缓存时设计的一套 local_cache + redis 的存储方案,或者 redis + redis 的模式。

另外,就是合理的降级方案。在高并发场景下,当检测到过高的并发可能或已经对资源造成影响后,通过限流降级的方案保护下游资源,避免整个资源被打垮而不可用,在限流期间逐步构建缓存,当缓存逐渐恢复后取消限流,恢复降级。

参考

https://medium.com/@mena.meseha/3-major-problems-and-solutions-in-the-cache-world-155ecae41d4f
http://www.ehcache.org/documentation/3.5/caching-patterns.html
https://docs.microsoft.com/en-us/azure/architecture/patterns/cache-aside
https://coolshell.cn/articles/17416.html
https://en.wikipedia.org/wiki/Cache_(computing)
https://docs.oracle.com/cd/E13924_01/coh.340/e13819/readthrough.htm

Structured Streaming Tips (一)

前言

Spark / Storm 简单对比:

  • storm 的特点是延时更低,而 spark 吞吐更高
  • spark 支持 sql 形式的 streaming 开发,批处理场景和流处理场景可以很大程度上的公用代码,开发效率高,而 storm 不支持批处理且 sql 式 streaming 仍处理 beta 阶段,所以其开发成本更高
  • 相比 storm 的资源调度,spark 的资源调度可以基于 yarn、mesos 等,其资源利用率更加高效

kryo 序列化

tuning a Spark application – most importantly, data serialization and memory tuning 官方文档指出,序列化和内存调整是整个优化 spark 程序的最重要的两点。

spark 默认使用 Java serialization ,相比 Kryo serialization, 其序列化速度、压缩都差距非常大 ,最大速度相差约 10x,压缩 5x。使用 kryo 序列化能极大的提升内存的使用效率,以及处理速度,是程序优化的第一步!

spark kryo demo

1.带序列化的 class 实现 java.io.Serializable:

image.png

2.实现注册Kyro序列化类,将待序列化的类注册,这一步是可选的,如果未注册,Kryo 仍然可以工作,但它必须存储每个对象的完整类名称,这是浪费的。所以最好注册

image.png

3.修改 spark session 序列化相关的配置 spark.serializer 以及spark.kryo.registrator :

image.png

参考

https://blog.csdn.net/leen0304/article/details/78732171
https://github.com/holdenk/learning-spark-examples/blob/master/src/main/java/com/oreilly/learningsparkexamples/java/BasicAvgWithKryo.java

内存优化

内存管理机制

spark 在1.6 版本以后引入了新的内存管理机制——UnifiedMemoryManager,其内存管理模型大致可以分为三部分 Reserved Memory,User Memory,Spark Memory,如下:

image.png

  • Reserved Memory 默认 300M,系统预留,需要重新编译 spark 才能更改。官方介绍为 测试使用的,一般情况下我们无需关心。(上图紫色部分)

  • UserMemory,用户内存。其被用来存储用户自己的数据,完全由你操作,比如 input data,map 操作后的 transform data,这部分内存在SparkMemory 分配后才会分配。(上图蓝色部分)

  • SparkMemory,这部分内存的用途又被分为两类:

  • Storage Memory:主要用来缓存 spark data 以及作为 ‘unrool’ 序列化数据的临时空间,以及存储 broadcast vars。当这部分内存不足时,unroll 以及 broadcast 的存储会落磁盘,不会OOM,当然代价是性能的损失。在资源不足时,牺牲一定的性能,保证稳定的前提下,可以适当的降低此部分的内存消耗。

  • Execution Memory: 主要用来存储Spark task执行需要的对象,比如 shuffle、join、union、sort 等操作 buffer。这块内存会 OOM,且无法被其他tasks clean。注意保证此块足够内存可用。

在我们的应用场景中,主要特点是:

  • 大量的 kafka input data(15w qps)

  • 按 5min 的 window 以及访问的 uuid(id+url)为 group key,然后 count。

在这种场景中,不需要缓存,storage 的主要用途为 unroll 以及 broadcast,所以 Storage Memory 可以降低到很低的值。

image.png

另外使用 execution memory 的部分主要是 groupby shuffle,在我们的处理逻辑中 group by 之前会 filter无意义的http request,同时以一个更小的 CountUnit 对象(仅仅保留 http request 的 host,正则匹配后的url,event timestamp,ip,id)做 frequency 的 count,进最大程度的缩减存储,控制 shuffle 传输的数据量,所以 execution 部分也可以设置的很小,如下图 executor storage memory 的使用占比以及 shuffle 的使用占比。

image.png

spark memory 中 storage 和 execution 的最大占比分配通过 spark.memory.storageFraction 控制,默认值为 0.5 即,各占一半。为什么说是最大占比的?因为整个 spark memory 是共享的,即可以互相侵占,这个参数配置的是 storage memory 在整个 spark memory 中的最小占比。因为 storage 是可以被 execution 驱逐,所以这个参数设定了一个被驱逐的底线,即留给 storage 的最小空间。反之 execution 无法被 storage 驱逐,但 execution 空闲时,是可以被 spark memory 使用的,最大可能的提高内存利用率。

在不同的场景下,需要根据不同的需求调整 spark.memory.storageFraction

从上文storage memory 和 execution memory 的占比可以看出,我们的程序对 spark memory 的依赖相对较低。为什么说相对较低呢,因为相对整个 kafka input 的数据的入队量较低,kafka input 的原始数据是一个完成的 http request,以当前 qps 15w + 5min window 来看,着实不是一个小数据量,而这部分数据量占用的是 user memory,所以说相对 user memory 而言对spark memory 依赖较低。

User memory 和 spark memory 在整个 heap 的分配是通过 spark.memory.fraction 参数配置的,默认是 0.6(2.0 及以上版本,1.6 是0.75),即 user memory 占约 0.4 executor memory(比这个值略低,实际为 0.6 ( Executor Memory - Reserved Memory)), spark memory 占约 0.6 executor memory,根据不同的场景,调整此值能最大化的优化资源利用。

在我们 frequency-count 的实际生产环境(qps 15w + 5min window group)中,设置每个 executor memory 为 10g 时,发现运行较慢的 task 日志中多次出现 Full GC,开始认为是 GC 问题,经过不断调参优化,虽然有一点提升,但当运行一段时间后,仍然会频繁出现 FULL GC,task 执行耗时越来越大(几十分钟)。后来仔细观察 GC 日志发现,GC 后 整个 old gen 仍然处于一个很大的值,趋于占满其上限,GC 回收效率一般,这是因为 user memory 不足,从kafka 源源不断的读取数据,由于 user memory 不足,导致不断 gc 回收空间分配给 input data。

image.png

增大 User memory 的两个方法,一是调大 executor memory, 而是提升 user memory 在整个 executor memory 的占比。因为我们的场景中对 spark memory 的依赖较小,所以在适当增大 executor memory (10g ->16g )配置后,并通过降低 spark.memory.fraction 的值(默认 0.6 -> 0.2)提升的 user memory 的大小。

整个 streaming job 运行了 24h 后,每个 stage 不会再出现之前最大执行几十分钟的情况了,因为 task 的GC 日志中不会再频繁出现 FULL GC,但没有释放太多资源的情况。在 input data 波峰时,最慢的 stage 也可以再分钟级完成(之前运行 1个小时后,就会出现某个 stage 的某个 task GC 耗时达到 30min 的情况)。

参考

Spark Memory Management :https://0x0fff.com/spark-memory-management/
spark memor configuration: http://spark.apache.org/docs/latest/configuration.html#memory-management
Spark 内存管理详解:https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html
spark 内存管理:https://wongxingjun.github.io/2016/05/26/Spark%E5%86%85%E5%AD%98%E7%AE%A1%E7%90%86/

GC 优化

生产环境中,我们的 streaming job 在运行时间长时间后(12h)发现仍会出现执行7.8 min 的 task,查看其 GC 日志发现又出现了 FULL GC,虽然可以接受这种个位数分钟级的延时,但是生产环境最好还是避免 FULL GC。

image.png

下面聊一聊生产环境的 GC 优化过程。

GC 参数调整

在生产环境数据量较大的场景下(15wqps ),GC 是一个不可避免的问题,默认 spark 使用 Parallel GC,尽管 Parallel GC 是多线程并发执行,但受限于传统的JVM 内存管理和HEAP结构(如下图),其不可避免的受 Full GC 影响,易出现较大时间的停顿。

image.png

对于流式场景而言显然长时间的 “stop the world” 是难以接受的,spark 官方推荐在 streaming 场景更推荐使用 G1GC。G1GC 是 oracle 推出的以取代 CMS 为目标的 GC(当然现在已经做到)并在 JAVA 1.9 中成为默认 GC ,其特点是 low-pause, server-style,在实现高吞吐量的同时,尽肯能的控制暂停时间。个人理解是 Parallel GC 和 CMS GC 的综合体。G1GC 的 HEAP 结构和传统的不同,更加高效,如下:

image.png

spark executor gc 配置,通过 –conf spark.executor.extraJavaOptions 指定:

1
spark-submit --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" xxx.jar

同时最好添加以下配置打印 GC 日志,方便 G1GC相关参数的调整:

1
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy

在生产环境 GC 日志可以发现发生了 FULL GC:

image.png

G1GC 只提供了 YONG GC 和 Mixed GC,当 Mixed GC 无法满足进程的内存分配时会触发 serial old GC(full GC),其效率相比 Parallel GC 是差很多的。所以可以通过提早 Mixed GC,以及加快 Mixed GC 来尽量规避 FULL GC,添加参数如下:

1
2
-XX:InitiatingHeapOccupancyPercent=35  # 触发标记周期的 Java 堆占用率阈值, 默认 45%,注意是 `non_young_capacity_bytes,包括 old+humongous` 的占比
-XX:ConcGCThreads=20 #  并行标记的线程数,会占用一定资源

另外,日志中如出现:

image.png

则表示有 humongous object,这些 obj 只有在 FULL GC 才会回收,所以可以,增大G1HeapRegionSize 相关配置的值,尽量减少 Humongous Area 区域在 heap 中的创建:

1
-XX:G1HeapRegionSize=16m #  G1 区域的大小。值是 2 的幂,范围是 1 MB 到 32 MB 之间。目标是根据最小的 Java 堆大小划分出约 2048 个区域

G1的 evacuation pause 在几十到一百甚至两百毫秒都很正常。所以最好不要把MaxGCPauseMillis 设得太低,不然G1跟不上目标就容易导致垃圾堆积,反而更容易引发full GC而降低性能。

1
-XX:把MaxGCPauseMillis=1000 # 默认是 200ms,在以分钟为处理单位的生产环境可以接受秒级的暂停

整个 spark executor 的完整配置:

1
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -verbose:gc -XX:+PrintGCDetails -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=20 -XX:G1HeapRegionSize=16m"

尽可能少的减少内存占用

GC的成本与 Java 对象的数量成正比,因此使用较少对象的数据结构大大降低了此成本。

1. Java中,有三种类型比较耗费内存:

  • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。

  • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。

  • 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。

因此Spark官方建议,尽量不要使用上述三种数据结构:

  • 使用字符串替代对象,

  • 使用原始类型(比如Int、Long)替代字符串,

  • 使用数组替代集合类型(Spark 官方推荐使用 fastutil 中提供的集合类型)

2. 对于包含 filter 算子的场景,尽可能早的 filter,然后在 map、reduce,减少在 map、reduce 过程中创建对象或其他变量的数量。

3. 拼接字符串时,避免隐式的String字符串,String字符串是我们管理的每一个数据结构中不可分割的一部分。它们在被分配好了之后不可以被修改。比如”+”操作就会分配一个链接两个字符串的新的字符串。更糟糕的是,这里分配了一个隐式的StringBuilder对象来链接两个String字符串。eg:

1
2
       StringBuilder tmp = new StringBuilder(“test”);
       tmp.append("#").append(”test“);

以上的目的主要为了尽可能地减少内存占用,从而降低GC频率,提升性能。

参考

spark 官方推荐 G1GC:http://www.bijishequ.com/detail/492289?p=70-69
Java GC 分类:https://www.bridgeli.cn/archives/342
G1GC oracle 官方doc : http://www.oracle.com/technetwork/articles/java/g1gc-1984535.html
G1GC 实现基本原理:https://tech.meituan.com/g1.html
G1GC 实现讨论:http://hllvm.group.iteye.com/group/topic/44381http://hllvm.group.iteye.com/group/topic/21468
spark gc 调优实践: https://www.csdn.net/article/2015-06-01/2824823
https://www.infoq.com/articles/G1-One-Garbage-Collector-To-Rule-Them-All
https://www.infoq.com/articles/tuning-tips-G1-GC
http://www.bijishequ.com/detail/492289?p=70-69

反爬小述

反爬虫是一个持续、对抗的过程,没有一劳永逸的方法,需要不断的投入,所以,无法完全的防治爬虫,而反爬虫要做的其实归纳起来很简单:不断提升爬虫爬取成本。

前端保护

很多网站的对于核心数据在前端展示时都做了一些数据隐藏、编码等手段(比如图片替换文字、字符映射),对于爬虫而言,即使拿到 html,但是解析其中的内容也比较复杂。部分网站还采用动态 url生成的防范机制,难度更高。

前端做保护工作在一定程度上能提高对核心数据的保护,不断的更新前端保护机制,爬虫也需要不断研究破解机制,对爬虫来说是非常耗时的操作。

Example

猫眼

猫眼电影的一些数值类数据,包括票房、票价等都是通过 “stonefont” 控制,并不是真正的数字,需要通过对应的字体字符集映射回去,而且每次请求下发的映射都是动态的:

猫眼票价隐藏

所以爬取时需要兼顾爬取 html 以及字符映射关系,然后自行解析。

去哪儿

去哪儿票价通过前段 html 多个元素叠加合成,要是稍不注意,爬取的就是脏数据:
去哪儿机票隐藏

后端特征

以 web 端访问为例,正常的浏览器请求包含有效的 UA,cookie,referer 等常规信息,爬虫要完全兼顾此类信息成本较高。同样的,此类方法由于不同的浏览器特质(比如微信内浏览器)也相对容易出现误伤。

设备指纹在安全风控反作弊领域常用手段之一,通过采集设备、浏览器的信息来唯一的标识设备,通过此属性能很大程度的标识是否是无头浏览器、模拟器以及直接通过后端接口爬取数据。

通过 header 的检测方法实时性更强,其无需大量的数据积累,在兼顾准确性与误伤的同时,能更快的识别爬虫,而代价为了控制误伤就是遗漏率略高。

频次特征

访问频次是爬虫检测最基本的手段之一,正常用户的在浏览内容时,在单位时间内(比如1min,10 min,1hour)访问某个 path 的数值不会超过过某一阈值,这一阈值和访问具体 path 也相关,比如 zl 内容一般较多,所以阈值相对较低,而 comment 则相对较高。但是爬虫、尤其是初级爬虫,毕竟是机器访问很容易突破这一阈值,被反爬系统限制。

访问频次的检测手段对于不同的维度 member、ip、device 要有不同的阈值,尤其是 ip 维度,因为存在 “网关 ip” 这种特例,阈值相对 member 要高一些,但这样仍会会存在爬虫混在正常用户的群体中的 demo,比如校园出口 ip,学生是初级爬虫的主要生产者之一,所以 ip 访问频次的阈值虽然要相对 member、device 要大一些,但仍不能过被爬虫钻空子。

这种情况下,在限制爬虫时可以区分对待,当限制主体为 ip 时,对于已经登录的用户不会产生影响,但未登录用户会被限制,而登录用户则在以 member、device 为识别限制主体。

通过频次识别爬虫在应对初级爬虫或首次爬虫时效果较明显,能起到一定的保护后端服务以及数据的效果,但当爬虫通过调试后,能够逐渐摸索出我们的访问阈值从而控制自身的访问频率,避免被识别。此时,爬虫通分布式多 ip 策略仍能够大量、快速的爬取数据。

对于此类情况,基于行为特征的识别能发挥更大的作用。

访问规律

对于爬虫而言,其行为相较正常访问用户,易出现以下特点:

  • 访问 url 种类单一
  • 某一类 url 访问量占比高
  • 访问间隔相对稳定
  • 单位时间内访问数相似
    ……

通过此类特征能搞很好检测出因为访问频率低而被频率策略遗漏的爬虫。比如对于爬取用户信息的爬虫,其访问特征很明显,不同于正常用户基于内容的访问,其大部分访问集中在用户类的 url 上,所以在其整体访问较集中、访问 url 的离散度过低,通过这一特点很容易可以识别出此类爬虫。

对于暴露后端接口的服务,很多爬虫通过直接拉取后端接口的方式既可获得结构化的数据,这种网站简直是爬虫的最爱,省去的解析 html 的过程。当然这类爬虫也相对比较好防治,相对正常的浏览器访问,这类防虫不会访问一些浏览器必定会触发的 ajax 请求,所以通过对比两类数据可以相对比较容易识别。

对于抓取内容的爬虫,其和正常用户最大的不同在于,用户会受内容质量、个人兴趣习惯等多种因素的影响,导致其对于不同内容的停留时间不同,请求之间的访问间隔不一,单位时间内访问量不规律,而爬虫在这一点上模仿难度较高,普通一些的爬虫在这些特征的表现比较明显,通过制定简单的策略规则既可以识别,高级一些的爬虫会使用随机访问等掩护手段,在这时普通的规则效果较差,可以通过算法模型比如 SVM、HMM 等模型分析。

IP

对于爬虫而言,更换 ip 是非常普及的一个手段,通过代理 ip 爬取更是爬虫的通用手段,很多代理 ip 网站都提供免费的代理 ip,github 上也有很多实时爬取这些代理网站 ip 并验证可用的资源,获取成本极低。对于反爬而言,收集这类资源同样有必要,和上述特征结合使用,能极大的提高准确率,当然有条件的可以购买一些第三方的数据情报。
西刺代理

除了代理 Ip 外,爬虫另外一个通用的手段就是动态拨号,对于这类ip ,需要注意控制误伤,避免长期封禁对正常用户的影响:
动态拨号ip

其他更换 ip 的手段还包括 “tor 洋葱网络(已被墙,延时较高)”,相对前两者使用人数较少。

验证、拦截

403

比较通用的反爬拦截手段就是直接拦截掉用户的请求,返回 403 (或者其他状态码),切断爬虫的访问,但是这种情况比较适用于识别准确率较高的场景,对于疑似爬虫的请求一般采用验证码方式拦截。

验证码

验证码是各大厂拦截爬虫比较通用的手段之一,从最简单的字符验证码到 js 拖动验证码等等,通过算法也是一一被攻克:
GitHub 开源资源

除了算法破解,还有打码平台的存在,直接人工识别,更是大大降低了验证码的效果:
人工打码平台

现在单纯的通过验证码已经能比较容易的被破解,google 的新式验证码效果极佳,对于误伤用户的体验也很好,他直接提供一个点击框,通过对比用户的行为特征、采集浏览器信息等(还有一些google 没有透漏的特征)能比普通验证码效果更好(据说区分人类和机器之间的微妙差异,在于他/她/它在单击之前移动鼠标的那一瞬间。),目前 stackoverflow 也使用了类似的验证码。所以在验证码页面采集多采集一些行为信息,设备信息等等可以作为进一步识别爬虫的依据。

投毒

投毒就是对于爬虫和正常人的返回不同的结果,给爬虫以假象,让其自己为爬取到了真实数据。这同样要求识别率较高,否则造成误伤对用户体验过差。

其他

蜜罐

蜜罐也是安全圈中比较常见的一种手段,但是对于细心的爬虫来说,往往不易上当。

黑名单

对于被准确识别出来的爬虫,其 ip、member 信息可以构建一套自己的黑名单库,积累这种资源类数据,毕竟代理 ip、失控帐号这类数据复用相对还是比较高的。

一些反爬虫的资料

爬虫识别

http://www.freebuf.com/articles/web/137763.html
http://bigsec.com/bigsec-news/anan-16825-Antireptile-zonghe
https://www.zhuyingda.com/blog/article.html?id=8
http://www.sohu.com/a/166364494_505779
http://www.cqvip.com/main/export.aspx?id=672889284&
https://github.com/equalitie/learn2ban
https://patents.google.com/patent/CN103631830A/zh
http://www.xueshu.com/jsjyxdh/201704/28789559.html
http://www.sohu.com/a/207384581_609376
https://www.zhuyingda.com/blog/b17.html

代理 ip 收集

https://github.com/luyishisi/Anti-Anti-Spider/blob/master/7.IP%E6%9B%B4%E6%8D%A2%E6%8A%80%E6%9C%AF/README.md
https://github.com/SpiderClub/haipproxy

验证码

https://www.urlteam.org/2017/03/tensorflow%E8%AF%86%E5%88%AB%E5%AD%97%E6%AF%8D%E6%89%AD%E6%9B%B2%E5%B9%B2%E6%89%B0%E5%9E%8B%E9%AA%8C%E8%AF%81%E7%A0%81-%E5%BC%80%E6%94%BE%E6%BA%90%E7%A0%81%E4%B8%8E98%E6%A8%A1%E5%9E%8B/

前端反爬

http://imweb.io/topic/595b7161d6ca6b4f0ac71f05
https://www.urlteam.org/2016/11/%E5%9B%9B%E5%A4%A7%E8%A7%86%E9%A2%91%E7%BD%91%E7%AB%99%E5%8F%8D%E7%88%AC%E8%99%AB%E6%8A%80%E6%9C%AF%E7%A0%94%E7%A9%B6/

Hive-Transform-Python:快捷的Map/Reduce

Hive 提供了 Transform 这一关键字,使用 python 脚本处理hive 的数据,实现 Map/Reduce 的效果,在一些场景下,相比直接编写 Hadoop MR 要方便不少。

简介

首先简要介绍一下 hive sql 语句的编写逻辑以及 python 脚本的编写方法。

hive 部分编写

hive transform sql 的一个很常用的模式是:

  • hive sql 通过查询语句获取输入源数据
  • 调用 python 脚本 MAP、REDUCE 处理数据
  • hive sql 将 python 的处理结果入库或其他操作后续操作

执行 hive 前要先加载 python 脚本,脚本可以上传到 hdfs 上,通过语句 ADD FILE hdfs://xxxx 加载。

比如从 表A 中 读取数据,通过 python MAP/REDUCE脚本处理后将处理结果写入表 B,对应的 hive 语句约为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ADD FILE hdfs://xxxx;
FROM (
FROM (
SELECT *
FROM TABLE-A
) T
MAP T.a, T.b, T.c
USING 'python ./map.py'
AS d, e, f
CLUSTER BY d
) map_out
INSERT OVERWRITE TABLE-B
REDUCE map_out.d, map_out.e, map_out.f
USING 'python ./reduce.py'
AS (g ,h ,i)

python 部分编写

python 脚本的处理逻辑大概可以分为三部分:

  • 从 hive 获取输入数据
  • map、reduce 操作
  • 输出数据给 hive

其中输入、输出部分是利用系统标准输入输出流实现的,python 从 sys.stdin 中获取 hive 传入的数据,将处理结果通过 sys.stdout 传给 hive。

python 标准输入获取的每一行对应 hive sql 的一条数据,每一行通过 \t 区分 hive 表的各个字段值。同样的,输出给 hive 的每一行中不同的字段值也要通过 ‘\t’ 连接,否则 hive 会解析错误。

以 map 处理为例,python 脚本通过用的模式如下:

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
# coding: utf8
import sys
def map_field(a,b):
return a+1, b+1
for line in sys.stdin:
a, b = line.split('\t')
c, d = map_field(a,b)
print c + '\t' + d

Example

下面介绍一个使用 hive-transform 统计用户 Get/Post 请求数的例子。在这个例子中将从一张记录所有用户请求记录的表 member_source_request 中读取源数据,并过滤掉 OPTION 等请求,只统计 GET,POST,PUT,DELET 四种请求。并将记录结果写入到一张 member_method_count 表中。

member_source_request 表 schema:

Name Type
1 member_id int
2 method string
3 url string
4 ip string
5

创建 member_method_count 表:

1
2
3
4
5
6
7
8
CREATE TABLE tmp.member_method_count(
member_id INT,
get_request BIGINT,
put_request BIGINT,
post_request BIGINT,
delete_request BIGINT
)
partitioned by (`date` INT)

Hive SQL

ADD FILE hdfs:///member-method.py; 
FROM (
    FROM (
        SELECT *
        FROM tmp.member_source_request
        WHERE member_id is not null
    ) T
    MAP T.member_id, T.method, T.url
    USING 'python ./member-method.py --mapper'
    AS member_id, method, url
    CLUSTER BY member_id
) map_out
INSERT OVERWRITE TABLE tmp.member_method_count PARTITION (date=20180220)
REDUCE map_out.member_id, map_out.method, map_out.url
USING 'python ./member-method.py --reducer'
AS member_id, get_request, put_request, post_request, delete_request

python 脚本

MemberRequestJob 为具体实现 map、reduce 逻辑,其父类可服用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python
# coding: utf8
import sys
from collections import defaultdict
class MRJob(object):
def __init__(self, sep='\t'):
self.sep = sep
def map(self, line):
raise NotImplementedError()
def reduce(self, key, value):
raise NotImplementedError()
def map_end(self):
pass
def reduce_end(self):
pass
def run_mapper(self):
for line in sys.stdin:
line = line.strip('\n').strip('\t')
self.map(line)
if hasattr(self, 'map_end'):
self.map_end()
def run_reducer(self):
for line in sys.stdin:
line = line.strip('\n').strip('\t')
key, value = line.split(self.sep, 1)
self.reduce(key, value)
if hasattr(self, 'reduce_end'):
self.reduce_end()
def output(self, key=None, value=None):
print str(key) + self.sep + str(value)
def run(self):
if len(sys.argv) <= 1:
raise Exception('--mapper or --reducer must be set')
self.args = tuple(sys.argv[2:])
if sys.argv[1] == '--mapper':
self.run_mapper()
elif sys.argv[1] == '--reducer':
self.run_reducer()
class FieldMRJob(MRJob):
def __init__(self, field_sep='\t', sep='\t'):
MRJob.__init__(self, sep)
self.field_sep = field_sep
def map_fields(fields):
raise NotImplementedError()
def reduce_fields(key, fields):
raise NotImplementedError()
def map(self, line):
fields = line.split(self.field_sep)
self.map_fields(fields)
def reduce(self, key, value):
values = value.split(self.field_sep)
self.reduce_fields(key, values)
def output(self, key=None, values=()):
value = self.field_sep.join(map(str, values))
MRJob.output(self, key, value)
class MemberRequestJob(FieldMRJob):
def __init__(self):
FieldMRJob.__init__(self)
self.all_member = set()
self.all_get_counts = defaultdict(int)
self.all_post_counts = defaultdict(int)
self.all_put_counts = defaultdict(int)
self.all_delete_counts = defaultdict(int)
def map_fields(self, fields):
member_id, method, url = fields
if method in ['GET', 'PUT', 'POST', 'DELETE']:
self.output(member_id, (method, url))
def reduce_fields(self, member_id, fields):
method, url = fields
self.all_member.add(member_id)
if method == 'GET':
self.all_get_counts[member_id] += 1
elif method == 'POST':
self.all_post_counts[member_id] += 1
elif method == 'PUT':
self.all_put_counts[member_id] += 1
else:
self.all_delete_counts[member_id] += 1
def reduce_end(self):
for member_id in list(self.all_member):
self.output(member_id, (self.all_get_counts.get(member_id, 0),
self.all_put_counts.get(member_id, 0),
self.all_post_counts.get(member_id, 0),
self.all_delete_counts.get(member_id, 0)
)
)
if __name__ == '__main__':
MemberRequestJob().run()

Hive 基本语法

建表

通用建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE IF NOT EXISTS `Db.Table`(
`uuid` string,
`user_id` int,
`user_ip` string,
`created` int,
`user_agent` string,
`device_id` bigint,
`referer` string)
COMMENT 'This is a test table'
PARTITIONED BY (ptdate string)
CLUSTERED BY(userid) SORTED BY(created) INTO 64 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
STORED AS TEXTFILE
LOCATION 'hdfs://localhost:8020/user/test/logs/a'

利用查询结果建表

1
2
3
4
CREATE TABLE IF NOT EXISTS `Db.Select` AS
SELECT *
FROM TABLE_TEST
WHERE XX=XX

建表并复制表结构

1
CREATE TABLE IF NOT EXISTS `Db.Table` LIKE `Db.Tablelike`

Tips

  1. TABLEEXTERNAL TABLE 主要区别在于表数据的存储位置,TABLE 创建表后会到 HDFS 加载数据,并将数据移动到数据仓库目录下,因次删除表时对应的 HDFS 和表的元数据一起被删除,但是 EXTERNAL TABLE 的实际数据仍在存在 LOCATION 'hdfs://localhost:8020/user/test/logs/a' 对应的 HDFS 路径下,所以删表时 HDFS 数据仍在存在,只是 hive 表的元数据被删除。另外还有 TEMPORARY TABLE,只在当前登录用户 session 有效,失效后被删除。
  2. PARTITIONED BY(XX type) 指定 Hive 表按指定字段分区,一个 partition 对应数仓下表的一个目录,可以理解为设置的 partition 列的索引,一个通用的 case 就是 hive 表数据按天切成多个 partition。
  3. CLUSTERED BY 指定了 hive 表按指定列基于 hash 分桶并按在桶内按指定列排序,桶是比分区更细粒度的数据划分,同时支持排序,在一些查询场景下(比如抽样)查询处理效率更高。
  4. ROW FORMAT DELIMITD 指定了数据行的格式,表示支持列分隔符,每行数据通过 \t 区分 filed ,每个 filed 内如果是 array 则通过 , 分区元素,如果是 map 则通过 : 区分 key 和 value。ROW FORMAT 还支持其他格式,eg:
    JSON:
    1
    2
    ROW FORMAT SERDE
    'org.openx.data.jsonserde.JsonSerDe'

正则:

1
2
3
4
5
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = " (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\".*\") ?"
)

  1. STORED AS TEXTFILE,指定了数据的存储格式,表示以纯文本形式存储。其他还包括:
    1
    2
    3
    4
    5
    6
    7
    8
    # 文件存储格式
    : SEQUENCEFILE
    | TEXTFILE -- (Default, depending on hive.default.fileformat configuration)
    | RCFILE -- (Note: Available in Hive 0.6.0 and later)
    | ORC -- (Note: Available in Hive 0.11.0 and later)
    | PARQUET -- (Note: Available in Hive 0.13.0 and later)
    | AVRO -- (Note: Available in Hive 0.14.0 and later)
    | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname

修改表

hive 新增列

1
ALTER TABLE `Db.Table` ADD COLUMNS user_level INT

hive 改列

修改列的类型

1
ALTER TABLE `Db.Table` CHANGE user_id user_id STRING

修改列名

1
ALTER TABLE `Db.Table` CHANGE user_id new_user_id INT

修改列名后改变列在表中的位置

1
2
3
4
放在最前:
ALTER TABLE `Db.Table` CHANGE user_id new_user_id INT FIRST
放在列 uuid 后:
ALTER TABLE `Db.Table` CHANGE user_id new_user_id INT AFTER uuid

hive 新增分区

1
2
ALTER TABLE `Db.Table` ADD PARTITION ( ptdate='2017-09-28')
LOCATION 'hdfs://localhost:8020/user/test/logs/a/2017-09-28';

hive 删分区

1
ALTER TABLE `Db.Table` DROP PARTITION ( ptdate='2017-09-28')

删表

清空全表数据

1
TRUNCATE TABLE `Db.Table`

清空表指定 partition 数据

1
TRUNCATE TABLE `Db.Table` PARTITION (ptdate='2017-09-28')

hive 删表

1
DROP TABLE IF EXISTS `Db.Table`

写入数据到表

文件数据写入

1
2
3
# `OVERWRITE` 表示覆盖原表 partition
LOAD DATA INPATH "hdfs://localhost:8020/user/test/logs/test.txt"
OVERWRITE INTO TABLE `Db.Table` PARTITION (ptdate='2017-09-28')

注意:test.txt 文件格式要符合建表时 ROW FORMAT 配置, eg: 使用上面建表的 ROW FORMAT 配置,则下表中 array 类型 的 ip 和 map 类型的 request 格式如下:

user_id(int) \t created(int) \t ip(array) \t request(map)
123 456 11.11.11.11,22.22.22.22 1:1,2:2

查询结果写入

1
2
3
4
5
# `OVERWRITE` 表示覆盖原表 partition
INSERT OVERWRITE TABLE `Db.Table` PARTITION (ptdate='2017-09-28')
SELECT user_id, created, ip, request
FROM `Db.TableSource`
WHERE ptdate='2017-09-28' and user_id=123

or

1
2
3
4
FROM `Db.TableSource`
INSERT OVERWRITE TABLE `Db.Table` PARTITION (ptdate='2017-09-28')
SELECT user_id, created, ip, request
WHERE ptdate='2017-09-28' and user_id=123

Python 读写 hbase 数据的正确姿势(五)

为什么一条异常的连接会出现在 connect pool 中,而且总会拿到这条连接 ?

在本系列第篇文章末尾提出了这样一个问题 『 为什么一条异常的连接会出现在 connect pool 中,而且总会拿到这条连接 』,本文将继续深入这个问题,找到其根本原因。

问题分析

弄清这个问题,要深入 happybase 的代码看一下具体实现逻辑。

happybase 的连接池要求必须要使用 context manager,以下是其通过 context manager 获取/归还 Connection 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@contextlib.contextmanager
def connection(self, timeout=None):
# 获取一个 Connection 对象
connection = getattr(self._thread_connections, 'current', None)
return_after_use = False
if connection is None
return_after_use = True
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection
try:
# 打开连接,并返回一个可用的连接给contextmanager
connection.open()
yield connection
except (TException, socket.error):
# 捕获 Trhift 和 socket 的异常和错误,则需要 refresh thrift client,保证最后归还的 Connection 的 thrift client 是可用的。
logger.info("Replacing tainted pool connection")
connection._refresh_thrift_client()
connection.open()
raise
finally:
# 最终会归还这条连接
if return_after_use:
del self._thread_connections.current
self._return_connection(connection)

实现逻辑大概可以归纳为:

  • 在 pool 初始化的时候构建一个包含若干 Connection 的 queue 以及一个 lock。
  • 当向连接池请求连接时,open 一个 Connection 并返回,在这个过程中会 catch TException 以及 socket.error。
  • 如果 catch 到以上 error/exception 则会刷新 Connection 对象的thrift_client,保证在退出 context manager 时返回给队列的 Connection 是可用的。
  • 退出 context manager 前将 Connection 返还。

从以上逻辑可以发现,happybase 是通过 context manager 保证 Connection 在退出时是正常的,而我们的场景中出现了 socket.error 却并没有被 catch 住,说明有可能是错误发生在 context manager 之外,回到代码:

1
2
3
4
5
6
7
8
9
def recent_events_v1(start, end, table=None, filter_str=None, limit=2000):
with get_connetion_pool().connection() as conn:
if table is not None:
t = conn.table(table)
else:
t = conn.table(TABLE)
start_row = 'ARTICLE' + str(start * 1000000)
end_row = 'ARTICLE' + str(end * 1000000)
return t.scan(row_start=start_row, row_stop=end_row, filter=filter_str, limit=limit)

从代码可以发现,再使用连接池的过程中,退出 context manager 前直接 return table.scan() ,而 scan 方法会创建一个 scanner ,最终返回一个 generator,到这里基本可以说是水落石出了!

问题原因

因为 generator 的特性,在退出 context manager 前并没有发生真正的查询,所以这时返回给 connent pool 的 Connection 仍然是没有问题的。只用在真正遍历这个 generator 时才会发生数据查询,而这个过程肯定在 context manager 之外,所以此时如果出现 error 则不会再有类似的 catch 逻辑去保证这条 Connection 在发生异常时去刷新 thrift_client,最终导致这条已经归还给 pool 的 Connection 失效了。

同时这里还会存在另一个问题,在并发场景下,遍历 generator 发生查询时可能这个 Connection 已经被分配给其他线程使用了,导致这个 Connection 同时被两个线程所有,出现一些难以预测的问题。

解决问题

既然发现了问题的根因,解决起来就比较简单了,只要保证所有使用 Connection 的逻辑都发生在 context manager 内就好,所以这里可以把遍历generator 的逻辑放在 context manager 内,最终返回一个 list 对象而不是 generator,代码如下:

1
2
3
4
5
6
7
8
9
10
def recent_events_v4(start, end, table=None, filter_str=None, limit=2000):
with get_connetion_pool().connection() as conn:
if table is not None:
t = conn.table(table)
else:
t = conn.table(TABLE)
start_row = 'ARTICLE' + str(start * 1000000)
end_row = 'ARTICLE' + str(end * 1000000)
result_generator = t.scan(row_start=start_row, row_stop=end_row, filter=filter_str, limit=limit)
return [l for l in result_generator]

从下图可以看出,修正后的代码在运行的过程中出现 TTransportException后,不会在出现 Broken pipe

不会在出现 Broken pipe

继续思考

上面分析了『为什么一条异常的连接会出现在 connect pool 中』,那 『而且总会拿到这条连接 』 又是为什么?

仔细观察 happybase 获取 connection 的前半部分逻辑可以发现,其是优先从self._thread_connections 获取链接对象,当获取不到时才通过 self._acquire_connection 从 pool 中取。

这个 self._thread_connections 是个什么东西?这是一个 thread local 变量,即线程自有的局部变量,其他线程不可访问,happybase 源码:

1
self._thread_connections = threading.local()

某个线程第一次请求获取 Connection 时,通过这个 thread local 变量,把分配给它的 Connection 记录下来,当下次这个线程再请求时,则优先把这个 thread local 变量记录的 Connection 返回给线程。

因为我们是单线程场景,所以每次返回给主进程的都是同一个有问题的 Connection 对象,这就解释了 『总会拿到问题链接』 这个问题。

小结

综上,在使用连接池的场景中,注意类似 scan 这种具有延时行的操作,一定要放在 context mananer 内,才能保证连接池内的连接可用~

Python 读写 hbase 数据的正确姿势(四)

问题4: 查询异常 TApplicationException: Missing result

在上一篇文章中讨论了线上测试时出现 [Errno 32] Broken pipe 错误,这里继续分析另一个错误 TApplicationException: Missing result

问题描述

在解决了问题 3 后,又遇到了古怪的错误:在查询的过程中,出现了大量的 TApplicationException: Missing result 错误:

1
2
3
4
5
6
7
File "/usr/local/lib/python2.7/site-packages/happybase/table.py", line 402, in scan
self.name, scan, {})
File "/usr/local/lib/python2.7/site-packages/thriftpy/thrift.py", line 198, in _req
return self._recv(_api)
File "/usr/local/lib/python2.7/site-packages/thriftpy/thrift.py", line 234, in _recv
raise TApplicationException(TApplicationException.MISSING_RESULT)
thriftpy.thrift.TApplicationException: Missing result

而且,在大量出现此类错误之前伴有 timeout: timed out 超时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
File "/usr/local/lib/python2.7/site-packages/happybase/table.py", line 415, in scan
scan_id, how_many)
File "/usr/local/lib/python2.7/site-packages/thriftpy/thrift.py", line 198, in _req
return self._recv(_api)
File "/usr/local/lib/python2.7/site-packages/thriftpy/thrift.py", line 210, in _recv
fname, mtype, rseqid = self._iprot.read_message_begin()
File "thriftpy/protocol/cybin/cybin.pyx", line 429, in cybin.TCyBinaryProtocol.read_message_begin (thriftpy/protocol/cybin/cybin.c:6325)
File "thriftpy/protocol/cybin/cybin.pyx", line 60, in cybin.read_i32 (thriftpy/protocol/cybin/cybin.c:1546)
File "thriftpy/transport/buffered/cybuffered.pyx", line 65, in thriftpy.transport.buffered.cybuffered.TCyBufferedTransport.c_read (thriftpy/transport/buffered/cybuffered.c:1881)
File "thriftpy/transport/buffered/cybuffered.pyx", line 69, in thriftpy.transport.buffered.cybuffered.TCyBufferedTransport.read_trans (thriftpy/transport/buffered/cybuffered.c:1948)
File "thriftpy/transport/cybase.pyx", line 61, in thriftpy.transport.cybase.TCyBuffer.read_trans (thriftpy/transport/cybase.c:1472)
File "/usr/local/lib/python2.7/site-packages/thriftpy/transport/socket.py", line 108, in read
buff = self.sock.recv(sz)
timeout: timed out

服务端没有任何异常。

问题分析

看起来这种情况和上文问题 3 中Broken pipe + TTransportException 的错误组合模式比较类似,所以猜测timeout 是导致这种现象的导火索,为了验证猜想,尝试在测试环境手动复现错误场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
conn_pool = None
TABLE = 'article'
# 本地环境 timeout 设置为1 时 超时较多
# 生产环境为 10
def get_connetion_pool(timeout=1):
global conn_pool
if conn_pool is None:
conn_pool = happybase.ConnectionPool(1, timeout=timeout)
return conn_pool
def recent_events_v3(start, end, table=None, filter_str=None, limit=2000):
with get_connetion_pool().connection() as conn:
if table is not None:
t = conn.table(table)
else:
t = conn.table(TABLE)
start_row = 'ARTICLE' + str(start * 1000000)
end_row = 'ARTICLE' + str(end * 1000000)
return t.scan(row_start=start_row, row_stop=end_row, filter=filter_str, limit=limit)
def main():
# 问题4复现
for i in range(100):
# 有timeout,有 Missing result,有正常查询
try:
results = recent_events_v3(start=0, end=1505646570, table="test_article_java_2")
print len([i for i in results])
except Exception as e:
print e
print '#########################################'

运行结果如下:

image.png

而当把 timeout 增加到一个较大值时则不会出现这种情况。印证了猜想 TApplicationException: Missing result 异常前一定出现过 timeout

解决问题

增大 timeout 后,可以很大程度上减少这样的情况发生,但是 timeout 不同于问题 3 的 IllegalArgumentException 错误,可以主动控制,使用 scan 的查询场景以及网络环境本身(在稳定的场景,仍有可能出现抖动导致超时) 难以避免的会出现 timeout,所以仅仅增加 timeout 值,仍然后可能会出现这种情况。

问题 3 会出现 Broken Pipe 错误是因为之前发生错误导致连接失效,后续再使用异常连接时则会报错。问题 4 是否也是因为 timeout 后导致连接出问题,然后出现这种情况呢?

尝试验证这种猜想:在发生 timeout 时,catch 住并重新初始化连接池然后重试:

1
2
3
4
5
6
7
8
9
10
11
12
13
def main():
# 问题4修复
for i in range(30):
# 没有 Missing result,只有 timeout 和 有正常查询
try:
results = recent_events_v3(start=0, end=1505646570, table="test_article_java_2")
print len([i for i in results]) # 期望值为2, 实际报错
except socket.timeout:
conn_pool = None # catch timeout 后, 清空连接池,下次使用时重新初始化, 仅限单线程模型 !
print 'time out: reinit conn pool!'
# print traceback.format_exc()
# 不会在出现 `TApplicationException: Missing result` 错误
print '#########################################'

修改后运行测试代码只会出现 timeout,不会出现其他错误:

timeout re init

因为生产环境是多容器,每个容器单进程,在这种场景下连接池和一个全局变量连接的意义相差不大,整个连接池同一时刻只会被一个进程使用(所以连接池只初始化了 1 条连接),所以直接重置连接池是可以的,此时可以彻底避免 Missing results ~

继续思考

上文文末提到一个疑惑 :为什么一条异常的连接会出现在 connect pool 中,而且总会拿到这条连接 ?

这次同样还有另一个问题值得思考:timeout 后为什么会出现大量的 Missing results 错误?是否如同猜测的那样 timeout 后连接池中的连接失效了?

下篇文章见~

Python 读写 hbase 数据的正确姿势(三)

问题3: 查询异常 [Errno 32] Broken pipe

这篇文章将继续 python-hbase 这一话题,讨论一个在线上环境中出现的很有意思的问题。

问题描述

同样是前面文章中描述的类似查询场景,生产测试,在调用 hbase thrift 接口时,日志中捕获到大量的 [Errno 32] Broken pipe 错误,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
File "/usr/local/lib/python2.7/site-packages/happybase/table.py", line 402, in scan
self.name, scan, {})
File "/usr/local/lib/python2.7/site-packages/thriftpy/thrift.py", line 195, in _req
self._send(_api, **kwargs)
File "/usr/local/lib/python2.7/site-packages/thriftpy/thrift.py", line 206, in _send
self._oprot.write_message_end()
File "thriftpy/protocol/cybin/cybin.pyx", line 463, in cybin.TCyBinaryProtocol.write_message_end (thriftpy/protocol/cybin/cybin.c:6845)
File "thriftpy/transport/buffered/cybuffered.pyx", line 80, in thriftpy.transport.buffered.cybuffered.TCyBufferedTransport.c_flush (thriftpy/transport/buffered/cybuffered.c:2147)
File "/usr/local/lib/python2.7/site-packages/thriftpy/transport/socket.py", line 129, in write
self.sock.sendall(buff)
File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe

问题分析

[Errno 32] Broken pipe 往往意味着连接问题,连接的一端已经关闭连接,但是另一端仍然使用这个连接向对方发送数据。

经过和平台组同学的排查测试,排除了生产环境中链路质量的问题以及 hbase thrift server 稳定性的问题。如果不是网络问题导致,那有没有可能是 hbase 处理请求的过程中发生了错误,主动关闭了连接?为了进一步追查问题,在本地还原了线上场景,复现错误。

果不其然,还原现场后客户端最早收到了一个额外的异常,这之后才收到大量的 Broken Pipe 错误,新的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
File "/usr/local/lib/python2.7/site-packages/happybase/table.py", line 402, in scan
self.name, scan, {})
File "/usr/local/lib/python2.7/site-packages/thriftpy/thrift.py", line 198, in _req
return self._recv(_api)
File "/usr/local/lib/python2.7/site-packages/thriftpy/thrift.py", line 210, in _recv
fname, mtype, rseqid = self._iprot.read_message_begin()
File "thriftpy/protocol/cybin/cybin.pyx", line 429, in cybin.TCyBinaryProtocol.read_message_begin (thriftpy/protocol/cybin/cybin.c:6325)
File "thriftpy/protocol/cybin/cybin.pyx", line 60, in cybin.read_i32 (thriftpy/protocol/cybin/cybin.c:1546)
File "thriftpy/transport/buffered/cybuffered.pyx", line 65, in thriftpy.transport.buffered.cybuffered.TCyBufferedTransport.c_read (thriftpy/transport/buffered/cybuffered.c:1881)
File "thriftpy/transport/buffered/cybuffered.pyx", line 69, in thriftpy.transport.buffered.cybuffered.TCyBufferedTransport.read_trans (thriftpy/transport/buffered/cybuffered.c:1948)
File "thriftpy/transport/cybase.pyx", line 61, in thriftpy.transport.cybase.TCyBuffer.read_trans (thriftpy/transport/cybase.c:1472)
File "/usr/local/lib/python2.7/site-packages/thriftpy/transport/socket.py", line 125, in read
message='TSocket read 0 bytes')
TTransportException: TTransportException(message='TSocket read 0 bytes', type=4)

这个异常的大概意思是 server 端发生了异常并没有返回任何数据,扒一下 hbase server 的日志,又发现了一个有趣的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java.lang.IllegalArgumentException: Incorrect Filter String
at org.apache.hadoop.hbase.filter.ParseFilter.extractFilterSimpleExpression(ParseFilter.java:226)
at org.apache.hadoop.hbase.filter.ParseFilter.parseFilterString(ParseFilter.java:174)
at org.apache.hadoop.hbase.thrift.ThriftServerRunner$HBaseHandler.scannerOpenWithScan(ThriftServerRunner.java:1481)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hbase.thrift.HbaseHandlerMetricsProxy.invoke(HbaseHandlerMetricsProxy.java:67)
at com.sun.proxy.$Proxy9.scannerOpenWithScan(Unknown Source)
at org.apache.hadoop.hbase.thrift.generated.Hbase$Processor$scannerOpenWithScan.getResult(Hbase.java:4613)
at org.apache.hadoop.hbase.thrift.generated.Hbase$Processor$scannerOpenWithScan.getResult(Hbase.java:4597)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer$ClientConnnection.run(TBoundedThreadPoolServer.java:289)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

这个异常中出现了解决问题的核心关键词 Incorrect Filter String,从字面理解来看是传到 hbase 的 filter 不正确,导致解析失败。

找到这条有问题的 filter:

“SingleColumnValueFilter(‘basic’, ‘ArticleTypeID’, =, ‘binary:\x00\x00\x00\x00\x03’T\xc9’)”

从语法上来看感觉并没有什么问题,同样的逻辑使用 java client 则完全正确。为了一探究竟拉出 hbase 抛错部分的源码看看为什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public byte [] extractFilterSimpleExpression (byte [] filterStringAsByteArray,
int filterExpressionStartOffset)
throws CharacterCodingException {
int quoteCount = 0;
for (int i=filterExpressionStartOffset; i<filterStringAsByteArray.length; i++) {
if (filterStringAsByteArray[i] == ParseConstants.SINGLE_QUOTE) {
if (isQuoteUnescaped(filterStringAsByteArray, i)) {
quoteCount ++;
} else {
// To skip the next quote that has been escaped
i++;
}
}
if (filterStringAsByteArray[i] == ParseConstants.RPAREN && (quoteCount %2 ) == 0) {
byte [] filterSimpleExpression = new byte [i - filterExpressionStartOffset + 1];
Bytes.putBytes(filterSimpleExpression, 0, filterStringAsByteArray,
filterExpressionStartOffset, i-filterExpressionStartOffset + 1);
return filterSimpleExpression;
}
}
throw new IllegalArgumentException("Incorrect Filter String");
}

从源码中可以看到在两种情况下会 raise exception:

  1. filter 不是以 ParseConstants.RPAREN 结尾,即不是以 ) 结尾
  2. quoteCount不是偶数, 即单引号的数量不是偶数

到此,可以真相大白了,从代码中可以看到 hbase parse 的过程中是通过单引号提取参数的,而我的 filter 中有一个整型参数在转成 bytes 后包含单引号,影响了 hbase 解析 filter 参数,并最终导致 quoteCount 不是偶数,然后抛出异常。

解决问题

定位到问题根本后,解决问题就 so easy 了。解析 filter 的源码中用到了 escape quote 的方法 isQuoteUnescaped,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
public static boolean isQuoteUnescaped (byte [] array, int quoteIndex) {
if (array == null) {
throw new IllegalArgumentException("isQuoteUnescaped called with a null array");
}
if (quoteIndex == array.length - 1 || array[quoteIndex+1] != ParseConstants.SINGLE_QUOTE) {
return true;
}
else {
return false;
}
}

逻辑很简单,判断单引号下一下字符是否仍然是单引号,如果是则被转义,跳过检查。所以我们的filter 只需要通过 两个单引号 替换参数中的 一个单引号即可,eg:

1
2
hbase_int_filter_template = "SingleColumnValueFilter('a', '{property}', {symbol}, 'binary:{threshold}')"
f = hbase_int_filter_template.format(property=params[0], symbol=flag, threshold=struct.pack('>q', threshold).replace("'", "''"))

单引号转义后,没有再出现这两类 exception。

后续分析

回头来看,因为 filter 使用错误,导致 hbase 解析 filter 异常,hbase server 抛出异常,并中断连接,client 收到 TTransportException 异常,此时这条连接已经失效,但是仍在 connection pool 中,所有后续从 connection pool 中获取连接时拿到这条连接后,再向 hbase 发送请求时 client 端不断收到 [Errno 32] Broken pipe 错误。

出现了一个新的思考题,为什么一条异常的连接会出现在 connect pool 中,而且总会拿到这条连接 ?

题外话

仔细想想 filter 中单引号需要转义这种情况按理说 hbase 会在官方的 Document 中提到才对,翻翻 user guide ,果然找到了 Filter Language

hbase filter language

使用前用心看看官方文档还是很有必要的,可以少踩许多坑…

Python 读写 hbase 数据的正确姿势(二)

问题1:小续

上一篇文章中讨论了,在使用 filter 查询 hbase 的过程中,使用python 容易忽略的一个问题:存储整型数据的时候,容易忽略将整型数据转换成 bytes 数据进行存储,进而使用 java filter 过滤时无法过滤出正确的结果。

仔细分析这个问题的发生的过程:

  1. 使用 python 将整型数据使用 str() 强转成字符串存入 hbase
  2. 使用 java 的相关 filter,传入参数时直接使用 Bytes.toBytes(1) 方法将整型转成 bytes 查询
  3. 无法得出正确的结果

其发生的根本原因是: 存入 hbase 的 1 这个值是 str(1) ,而使用 java 查的时候传入的过滤参数是 int(1) 转成的 bytes,这两者本身就不是一个类型,所以才会查出异常的结果。因此如果想用 java 在这种场景下查出正确的结果还有另一种方法,即传入的过滤参数是str(1) 转成的 bytes!

1
2
3
4
5
6
7
8
9
10
11
12
13
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("basic"),
Bytes.toBytes("ArticleTypeID"),
CompareOp.EQUAL, Bytes.toBytes("1"));
// 注意这里将传入的是字符串"1",而不是1L 这个整数
// Scan python table `test_article_1`
System.out.println("Prepare to scan !");
ResultScanner scanner = table.getScanner(s);
int num = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
num++;
}
System.out.println("Found row: " + num);// 预期 50,结果为 50,查询到了正确的结果

不过在使用的过程中对于各种类型的数据最好还是通过相应的方法直接转成 bytes 存储比较好,因为字符串存储占据更大的空间。

问题2:scan 指定 start 和 end 时返回异常的结果

问题1讨论了一个 filter 过滤异常的问题,这次在使用 scan 指定 start、stop 做过滤时,又遇到了一个小问题。

问题重现

上文曾经提到查询频繁的场景是:按时间序列查出某一段时间创建的 articles,所以将 rowkey 设为 “ARTICLE” + 微秒级时间戳的形式,便于使用 scan 时指定 rowstart 和 rowstop。

但是在使用 python 查询的过程中又发生了一个有趣的问题,指定 rowstart 和 rowstop 分别是:

‘ARTICLE’ + str(1505024365 1000000)
‘ARTICLE’ + str((1505024365+10)
1000000)

因为测试数据是每秒写入一条,所以不加任何 filter ,指定以上 start 和 stop 时,预期结果数应为 10。

但是使用 python 查询出的结果确是错误的—— 0,而同样的代码去查询由使用 java 写入的数据时确能查到正确的结果。

寻找原因

很明显,使用 python 写入数据的逻辑仍然存在问题,再一次对比 python 和 java 写入 hbase 的数据。

使用上一篇文章中修正后的 python 代码(对应函数save_main_v2) 写入 hbase 的数据:
python 写入的数据

使用 java (对应函数test_hbase_filter1) 写入 hbase 的数据:
java 写入的数据

观察数据,可以发现: column、timestamp、value 部分经过修复已经一致了,但是 rowkey 区别很明显,python中 rowkey 是这样的:

ARTICLE1.50502434609e+15

java 中 rowkey 确是这样的:

ARTICLE1505030804083000

而 python scan hbase 的代码中,预期的 rowkey 格式和 java 写入的格式是对应的,但和 python 写入的格式则完全不一致。

1
2
3
4
print start_row
# 结果为 'ARTICLE1505030804083000'
# 和scan start 和 stop 对应
# 所以从 java 写入 hbase 的数据中查询得到正确结果

而上文中 python 的写入代码,其构造 rowkey 时的逻辑如下:

1
2
3
4
5
6
timestamp = time.time() + i
rowkey = "ARTICLE" + str(timestamp * 1000000)
print rowkey
# 结果为 'ARTICLE1.50502434609e+15'
# 是科学计数法表示的
# 所以无法匹配 scan 的逻辑

综上,可以得出结论,因为 python 中 time.time() 返回值为 float型,在其 timestamp * 1000000 扩展到微秒后,python 是采用科学计数法表示的,所以存入 hbase 中的值并不是预期的结果,从而导致后续查询异常。

正确的 scan 姿势

确定原因后问题就很好解决了,因为写入的 rowkey 中数值部分是 float 型,最终以科学计数法表示,所以可以 scan 查询时将传入的参数也变成 float 型,这样查询时传入的 rowkey 最终也会变成科学计数法表示的格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def recent_events_v1(start, end, table=None, filter_str=None, limit=2000):
with get_connetion_pool().connection() as conn:
if table is not None:
t = conn.table(table)
else:
t = conn.table(TABLE)
start_row = 'ARTICLE' + str(start * 1000000)
end_row = 'ARTICLE' + str(end * 1000000)
return t.scan(row_start=start_row, row_stop=end_row, filter=filter_str, limit=limit)
def main():
results = recent_events_v1(start=0.0, end=1505024364.0, table="test_article_2")
# 这里传入的是 `1505024364.0` 而不是 `1505024364`,下同
print len([i for i in results]) # 期望值为50, 实际值为50
results = recent_events_v1(start=1505024365.0, end=1505024365.0 + 10, table="test_article_2")
print len([i for i in results]) # 期望值为10, 实际值为10

使用科学计数法存储的 rowkey 在 hbase 虽然也能 scan 出预期的效果,但是在对以科学计数法表示的 rowkey scan 时,rowkey 的前段和末段是相同的,不同的是中间 N 位,这样在按序 scan 时相比单调递增的 rowkey 不是很理想,参考 OpenTSDB 使用 Hbase 的方式,还是以数字型的时间戳结尾存储更加理想。

所以,更好地解决方案是,在写入 hbase 时,将 time.time() 返回的 float 型,转成整型后传入,使得 rowkey 以 “ARTICLE” + “ int-timestamp-us” 型存储,然后使用原来的 scan 方法去查询。

拓展思考

Hbase 要求使用 bytes,因为 bytes 更节省存储空间,更适合海量存储的场景。在上面的场景中,rowkey 的时间戳部分是微秒型,如果使用字符串存储,其长度为:

1
2
In [37]: len(bytes('ARTICLE1505645391083000'))
Out[37]: 23

如果将时间戳部分以整型转化成 bytes 在和前半部分拼接在一起作为 rowkey 存储显然能节省不少空间:

1
2
In [41]: len(bytes('ARTICLE\x00\x05Y`b\xb1u\xf8'))
Out[41]: 15

那在 scan 的时候,hbase 是否也支持这种混合二进制的字典序?是否也能按指定的 start、stop 查询到正确的结果?一试便知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def save_main_v3():
datas = dict()
for i in range(100):
article_type_id = i % 2
timestamp = time.time() + i
rowkey = "ARTICLE" + struct.pack('>q', timestamp * 1000000)
data = {
"basic:" + "ArticleID": str(i),
"basic:" + "ArticleTypeID": struct.pack('>q', article_type_id),
"basic:" + "Created": str(timestamp),
}
datas[rowkey] = data
save_batch_events(datas, table="test_article_3")
def main():
# 问题2思考
save_main_v3() # 导入100 条数据,50条ArticleTypeID=0,50条ArticleTypeID=1
results = recent_events_v2(start=0, end=1505027700, table="test_article_3")
print len([i for i in results]) # 期望值为50, 实际值为50
results = recent_events_v2(start=1505027700, end=1505027700 + 10, table="test_article_3")
print len([i for i in results]) # 期望值为10, 实际值为10

Hbase 中存储的数据:
Hbase 中的 bytes rowkey

从以上结果能看出,使用 bytes 转化成整型的 rowkey 也是按字典序排列的,scan 可以得出预期的结果,当然这样的存储对人来说看起来比较别扭,可读性比较低,但这不影响机器,能大量的节省存储空间,明显是更优的选择。

Python 读写 hbase 数据的正确姿势(一)

之前操作 hbase 大都是用 java 写,或者偶尔用 python 写几个一些简单的 put、get 操作。最近在使用 happybase 库批量向 hbase 导入数据,并通过 java 实现查询的一些复杂的搜索时(scan+filter),遇到了一些有趣的问题。

实验版本

Hbase 版本:1.2.6
Happybase 版本:1.1.0
Python 版本:2.7.13

问题1:filter 过滤失败

问题重现

hbase 的使用场景大概是这样的:

有一个 hbase table,存储一些文章的基本信息,包括创建时间、文章ID、文章类别ID等,同属于一个column family,”article”。

查询的场景则是查找”指定的时间范围”,”文章类型ID为N” 的所有文章数据。

根据以上场景,设计如下 table:

  1. hbase table 为 article 。
  2. rowkey 是 “ARTICLE” + 微秒级时间戳(类似OpenTSDB 的rowkey,便于按时间序列查到某一段时间创建的 articles),即 “ARTICLE1504939752000000”。
  3. family 为 “basic”,包含 “ArticleID”, “ArticleTypeID”, “Created”, 三个 column。

查询时通过指定 rowkey start 和 rowkey stop,可以 scan 某一个时间段的数据(因为 rowkey 中包含数值型的时间戳),通过 hbase filter 实现”ArticleTypeID” == N 的过滤条件。

开始导入数据、准备查询,以下是导入数据部分代码 demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def save_batch_events(datas, table=None):
with get_connetion_pool().connection() as conn:
if table is not None:
t = conn.table(table)
else:
t = conn.table(TABLE)
b = t.batch(transaction=False)
for row, data in datas.items():
b.put(row, data)
b.send()
def save_main_v1():
datas = dict()
for i in range(100):
article_type_id = i % 2
timestamp = time.time() + i
rowkey = "ARTICLE" + str(timestamp * 1000000)
data = {
"basic:" + "ArticleID": str(i),
"basic:" + "ArticleTypeID": str(article_type_id),
"basic:" + "Created": str(timestamp),
}
datas[rowkey] = data
save_batch_events(datas)

查看一下 hbase 的数据,100 条数据全部正常导入,其中50条数据 “ArticleTypeID” 为0,50条为1

图 1:python-happyhbase 写入的数据

接下来就是用 hbase filter 过滤的过程了,假设查询 “ArticleTypeID” 为 0 的数据,使用 java 客户端实现查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void test_hbase_filter() throws IOException {
TableName tableName = TableName.valueOf("test_article_1");
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
// Scan python table `test_article_1`
System.out.println("Prepare to scan !");
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("basic"),
Bytes.toBytes("ArticleTypeID"), CompareOp.EQUAL, Bytes.toBytes(1L));
list.addFilter(filter1);
Scan s = new Scan();
s.addFamily(Bytes.toBytes("basic"));
s.setFilter(list);
ResultScanner scanner = table.getScanner(s);
int num = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
num++;
}
System.out.println("Found row: " + num);// 预期 50,结果为 0

问题出现:使用 java 期望的查询结果为 50 条,但是查出的结果却是 0 条!

使用 python 查询却可以得到正确的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def recent_events_v1(start, end, table=None, filter_str=None, limit=2000):
with get_connetion_pool().connection() as conn:
if table is not None:
t = conn.table(table)
else:
t = conn.table(TABLE)
start_row = 'ARTICLE' + str(start * 1000000)
end_row = 'ARTICLE' + str(end * 1000000)
return t.scan(row_start=start_row, row_stop=end_row, filter=filter_str, limit=limit)
if __name__ == '__main__':
filter_str = "SingleColumnValueFilter('basic', 'ArticleTypeID', =, 'binary:1')"
results = recent_events_v1(start=0, end=1505023900, filter_str=filter_str)
print len([i for i in results]) # 期望值为50, 实际值为 50,正确

寻找原因

经过 N 次确认,java 的读操作是没有问题的,python 实现的读写也得到了预期的效果。进一步探究,特意用 java 完整的实现的数据的导入和查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void test_hbase_filter1() throws IOException {
tableName = TableName.valueOf("test_article_java_1");
table = conn.getTable(tableName);
System.out.println("Prepare create table !");
Admin admin = conn.getAdmin();
if (!admin.tableExists(tableName)) {
HTableDescriptor td = new HTableDescriptor(tableName);
HColumnDescriptor basic = new HColumnDescriptor("basic");
td.addFamily(basic);
admin.createTable(td);
System.out.println("Created !");
}
// Put value to test_article_java_1
System.out.println("Prepare to write data to: " + table.getName().toString());
for (int i = 0; i < 100; i++) {
Put p = new Put(Bytes.toBytes("ARTICLE" + (System.currentTimeMillis() + 1000) * 1000));
p.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("ArticleTypeID"), Bytes.toBytes(Long.valueOf(i % 2)));
table.put(p);
}
// scan test_article_java_1
scanner = table.getScanner(s);
num = 0;
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
num++;
}
System.out.println("Found row: " + num);// 预期 50,结果为 50
}

可见,用 java 写的数据,用 java 读是没问题的,用 python 写的数据用 python 读也没问题。但 java 读 python 写的数据就存在异常,难道是 python 写的数据和 java 写的数据不一样?为此分别对比一下 python 和 java 写入 hbase 的数据:

图 2:java 写入的数据

仔细观察图 1 和图 2 中的数据可以发现,python 写入的数据中对应的 ArticleTypeID 值为 01,而 java 则是一串 bytes。突然意识到一个问题,hbase 读写的时候要求传入的数据类型为 bytes,而使用 python 传输的过程中这种整形数据是直接通过 str() 方法转成字符串存储到 hbase 中的,并不是以 bytes 的形式存于 hbase,所以使用 java 用转化成 bytes 的 filter 读才没能得到预期的结果。

正确的 filter 姿势

既然找到了原因,解决问题就比较简单了,存储的时候将整型数据全部都通过 struct.pack 方法转成 bytes 存入,这样就可以被通用的查询了,同时 使用 python 查询的时候也将 filter 中的整型数值替换成 bytes 格式。

使用 struct.pack 方法将整型转成 bytes 时,注意选择使用 big-endian 的 Byte order,即 pack 方法的第一个参数使用 >。因为 java 官方 client 采用这种字节序,下面是 Bytes.toBytes 的实现源码,可见采用的是 big-endian

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Convert a long value to a byte array using big-endian.
*
* @param val value to convert
* @return the byte array
*/
public static byte[] toBytes(long val) {
byte [] b = new byte[8];
for (int i = 7; i > 0; i--) {
b[i] = (byte) val;
val >>>= 8;
}
b[0] = (byte) val;
return b;

正确的 python 写入 hbase 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
def save_main_v2():
datas = dict()
for i in range(100):
article_type_id = i % 2
timestamp = time.time() + i
rowkey = "ARTICLE" + str(timestamp * 1000000)
data = {
"basic:" + "ArticleID": str(i),
"basic:" + "ArticleTypeID": struct.pack('>q', article_type_id),
"basic:" + "Created": str(timestamp),
}
datas[rowkey] = data
save_batch_events(datas, table="test_article_2")

查询是的filter:

1
filter_str = "SingleColumnValueFilter('basic', 'ArticleTypeID', =, 'binary:{value}')".format(value=struct.pack('>q', 1))

这样就没有问题了~

总结

使用 python 读写 hbase 数据,直接传输整型参数时,hbase 的 thrift 接口会抛出 TDecodeException: Field 'value(3)' of 'Mutation' needs type 'STRING' 异常,被告知只接受 string 类型的数据。这时注意将整型数据转化成 bytes 形式的 str,而不要直接使用 str() 方法强转,否则难以避免的会出现一些非预期的结果。

以为这样就没问题了? 请关注看下文~