jaceklaskowski gitbooks apache-kafka
1. 入口程序从kafka.Kafka的main函数开始:
def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args) // 读取server.properties val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) // 创建kafkaServerStartable try { if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() } catch { case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) } // attach shutdown handler to catch terminating signals as well as normal termination Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown) kafkaServerStartable.startup() kafkaServerStartable.awaitShutdown() } catch { case e: Throwable => fatal("Exiting Kafka due to fatal exception", e) Exit.exit(1) } Exit.exit(0) }1.1 getPropsFromArgs(args)// Kafka.scala def getPropsFromArgs(args: Array[String]): Properties = { ... val props = Utils.loadProps(args(0))启动时传入了参数config/server.properties:
启动参数
我们看到Utils.loadProps的返回值是Properties类型
// Utils.java public static Properties loadProps(String filename) throws IOException { return loadProps(filename, null); }读取参数
该函数的目的是读取config/server.properties中的参数到Properties中,供之后使用
1.2 KafkaServerStartable.fromProps(serverProps)object KafkaServerStartable { def fromProps(serverProps: Properties): KafkaServerStartable = { fromProps(serverProps, None) } def fromProps(serverProps: Properties, threadNamePrefix: Option[String]): KafkaServerStartable = { val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) // 貌似没啥用 new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters, threadNamePrefix) } }该函数做两件事: 1. 创建reporters 2. 创建KafkaServerStartable
先调试至此:
reporters为空
看到reporters是空数组,那么略过该变量
然后看下KafkaServerStartable,它维护了一个KafkaServer, 简单定义了startup、shutdown等函数,都是操纵server变量
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging { private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix) def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty) def startup(): Unit = { try server.startup() catch { case _: Throwable => // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code fatal("Exiting Kafka.") Exit.exit(1) } } def shutdown(): Unit = { try server.shutdown() catch { case _: Throwable => fatal("Halting Kafka.") // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. Exit.halt(1) } } /** * Allow setting broker state from the startable. * This is needed when a custom kafka server startable want to emit new states that it introduces. */ def setServerState(newState: Byte): Unit = { server.brokerState.newState(newState) } def awaitShutdown(): Unit = server.awaitShutdown() }1.3 KafkaServerStartable启动main函数最后两句是:
kafkaServerStartable.startup() kafkaServerStartable.awaitShutdown()由上文可知,这两句调用都由KafkaServer完成
我们先看下kafkaServerStartable.awaitShutdown()的实现
// KafkaServerStartable.scala def awaitShutdown(): Unit = server.awaitShutdown()// KafkaServer.scala private var shutdownLatch = new CountDownLatch(1) ... def awaitShutdown(): Unit = shutdownLatch.await()就是简单地等待CountDownLatch.
之后重点关注KafkaServer的startup方法
KafkaServer::startup/** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup(): Unit = { ... }该函数很长,有150行以上. 根据注释可知该函数的作用是启动Kafka服务器实例. 它主要做三件事: 初始化LogManager、SocketServer和KafkaRequestHandlers
initZkClient先看这句,注释和函数名说得很清楚,就是初始化zookeeper的客户端,并进行一些配置
// KafkaServer.scala /* setup zookeeper */ initZkClient(time)顺着调用链看下去:
private def initZkClient(time: Time): Unit = { info(s"Connecting to zookeeper on ${config.zkConnect}") def createZkClient(zkConnect: String, isSecure: Boolean) = { KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig)) } ... _zkClient = createZkClient(config.zkConnect, secureAclsEnabled) } // KafkaZkClient.scala def apply(connectString: String, ...) = { val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, name, zkClientConfig) new KafkaZkClient(zooKeeperClient, isSecure, time) }由KafkaZkClient::apply可见,代码创建了一个ZooKeeperClient,并用它创建了KafkaZkClient。再看两个类的注释可知,前者用于与zookeeper交互,后者是kafka专用的zookeeper客户端。
ZooKeeperClient内部维护了一个org.apache.zookeeper.ZooKeeper实例,作为kafka与zookeeper交互的客户端
// ZooKeeperClient.scala private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, "zk-session-expiry-handler") ... // Fail-fast if there's an error during construction (so don't call initialize, which retries forever) @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)LogManagerKafkaServer::startup中的注释提到,启动时会初始化LogManager. 函数中下面的代码启动了LogManager:
/* start log manager */ logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) logManager.startup()参考gitbook kafka-log-LogManager.html、Kafka LogManager详解(六) 可以获得更多该类的作用。
我们看下该类的注释。注释中说:
该类是kafka日志管理系统的入口,负责日志的创建、获取、清理。所有的读、写操作都是由每一个Log实例完成的。LogManager在一个或多个目录下保存日志。新的日志是在数据目录中以最少的日志创建的。"No attempt is made to move partitions after the fact or balance based on size or I/O rate."(不会翻译)有后台线程通过周期性地截断日志段来维持日志。/** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. * All read and write operations are delegated to the individual log instances. * * The log manager maintains logs in one or more directories. New logs are created in the data directory * with the fewest logs. No attempt is made to move partitions after the fact or balance based on * size or I/O rate. * * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe class LogManager(logDirs: Seq[File], ...LogManager的第一个参数logDirs指示了日志存储目录,我们调试看到地址是/tmp/kafka-logs
log 地址
在目录中可以看到我之前创建的topic,demo_topic,以及关于它的日志。
log文件夹
SocketServerKafkaServer::startup中的另一个作用就是启动SocketServer,下面的代码完成了这一步.
// Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. socketServer = new SocketServer(config, metrics, time, credentialProvider) socketServer.startup(startupProcessors = false)看下该类的源码可以知道,SocketServer的作用是: 管理与broker之间的新连接、请求、响应 Kafka支持两种请求层:
数据层,管理来自客户端和集群里其它broker的请求线程模型是:每个listener有一个Acceptor线程,处理新连接每个Acceptor有N个Processor线程,各自有自己的selector,并从socket读取请求有M个Handler线程,负责处理请求、生成响应并返回给processor线程进行写操作。可以定义多个数据层,只要在KafkaConfig里为"listeners"指定多个以","分隔的endpoints即可。控制层, 管理来自controller的请求. 这是一个可选项,可以通过指定"control.plane.listener.name"来设置. 如果没有设置,controller的请求都会由数据层处理.线程模型是:一个Acceptor线程处理新连接每个Acceptor有1个Processor线程对应,后者有自己的selector,并从socket读取请求. 1个Handler线程,负责处理请求、生成响应并返回给processor线程进行写操作。看到Acceptor、Processor和Handler,熟悉Reactor模型的各位一定对此不陌生了,Kafka的网络模型在此一目了然
关于kafka controller
/** * Handles new connections, requests and responses to and from broker. * Kafka supports two types of request planes : * - data-plane : * - Handles requests from clients and other brokers in the cluster. * - The threading model is * 1 Acceptor thread per listener, that handles new connections. * It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig. * Acceptor has N Processor threads that each have their own selector and read requests from sockets * M Handler threads that handle requests and produce responses back to the processor threads for writing. * - control-plane : * - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name". * If not configured, the controller requests are handled by the data-plane. * - The threading model is * 1 Acceptor thread that handles new connections * Acceptor has 1 Processor thread that has its own selector and read requests from the socket. * 1 Handler thread that handles requests and produce responses back to the processor thread for writing. */ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup with BrokerReconfigurable {我们之后再深入理解这些组件
总结我们简单浏览了Kafka的启动流程,并认识到了LogManager和SocketServer,这两个分别与kafka的日志管理、kafka的网络模型相关。 之后我们要着重看这两处。
---来自腾讯云社区的---平凡的学生族
微信扫一扫打赏
支付宝扫一扫打赏