/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kafka.log import java.util.Arrays import java.security.MessageDigest import java.nio.ByteBuffer import kafka.utils._ trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long) def get(key: ByteBuffer): Long def clear() def size: Int def utilization: Double = size.toDouble / slots } /** * An hash table used for deduplicating the log. This hash table uses a cryptographicly secure hash of the key as a proxy for the key * for comparisons and to save space on object overhead. Collisions are resolved by probing. This hash table does not support deletes. * @param memory The amount of memory this map can use * @param hashAlgorithm The hash algorithm instance to use: MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512 */ @nonthreadsafe class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extends OffsetMap { private val bytes = ByteBuffer.allocate(memory) /* the hash algorithm instance to use, defualt is MD5 */ private val digest = MessageDigest.getInstance(hashAlgorithm) /* the number of bytes for this hash algorithm */ private val hashSize = digest.getDigestLength /* create some hash buffers to avoid reallocating each time */ private val hash1 = new Array[Byte](hashSize) private val hash2 = new Array[Byte](hashSize) /* number of entries put into the map */ private var entries = 0 /* number of lookups on the map */ private var lookups = 0L /* the number of probes for all lookups */ private var probes = 0L /** * The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset) */ val bytesPerEntry = hashSize + 8 /** * The maximum number of entries this map can contain */ val slots: Int = (memory / bytesPerEntry).toInt /** * Associate this offset to the given key. * @param key The key * @param offset The offset */ override def put(key: ByteBuffer, offset: Long) { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) entries += 1 } /** * Check that there is no entry at the given position */ private def isEmpty(position: Int): Boolean = bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && bytes.getLong(position + 16) == 0 /** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 do { pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() } /** * Change the salt used for key hashing making all existing keys unfindable. * Doesn't actually zero out the array. */ override def clear() { this.entries = 0 this.lookups = 0L this.probes = 0L Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte) } /** * The number of entries put into the map (note that not all may remain) */ override def size: Int = entries /** * The rate of collisions in the lookups */ def collisionRate: Double = (this.probes - this.lookups) / this.lookups.toDouble /** * Calculate the ith probe position. We first try reading successive integers from the hash itself * then if all of those fail we degrade to linear probing. * @param hash The hash of the key to find the position for * @param attempt The ith probe * @return The byte offset in the buffer at which the ith probing for the given hash would reside */ private def positionOf(hash: Array[Byte], attempt: Int): Int = { val probe = Utils.readInt(hash, math.min(attempt, hashSize - 4)) + math.max(0, attempt - hashSize + 4) val slot = Utils.abs(probe) % slots this.probes += 1 slot * bytesPerEntry } /** * The offset at which we have stored the given key * @param key The key to hash * @param buffer The buffer to store the hash into */ private def hashInto(key: ByteBuffer, buffer: Array[Byte]) { key.mark() digest.update(key) key.reset() digest.digest(buffer, 0, hashSize) } }