前情提要 #
新建了一个 ES 索引,需要从 TIDB 导入一些数据到 ES 索引中。
- ES 索引名: index_test
- tidb 地址: 127.0.0.1:4000
- es 地址: 127.0.0.1:9200, 127.0.0.1:9201, 127.0.0.1:9202
下载 mysql-connector-java-8.0.21.jar 包 #
下载链接: https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.21/mysql-connector-java-8.0.21.jar
创建 Logstash configuration 文件 #
tidb-to-es.tml
input {
jdbc {
jdbc_driver_library => '/path/to/mysql-connector-java-8.0.21.jar'
jdbc_driver_class => 'com.mysql.cj.jdbc.Driver'
jdbc_connection_string => 'jdbc:mysql://127.0.0.1:4000/LawDB?useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true&autoReconnect=true'
jdbc_user => 'root'
jdbc_password => '[password]'
jdbc_validate_connection => true
# jdbc_paging_enabled => 'true'
# jdbc_page_size => '100'
jdbc_default_timezone => 'Asia/Shanghai'
statement => 'select id,title,content from [your table] where id <= 10000'
}
}
filter {
mutate {
remove_field => ['@version', '@timestamp']
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => [ '<http://127.0.0.1:9200>', '<http://127.0.0.1:9201>', '<http://127.0.0.1:9202>' ]
index => 'index_test'
action => 'create'
document_id => '%{id}'
cacert => '/home/[USER]/elasticsearch-8.5.3/config/certs/http_ca.crt'
user => "elastic"
password => "[elastic password]"
}
}
开始导入数据 #
logstash -f /path/to/tidb-to-es.conf
分页处理 #
当数据量很大的时候,Logstash 可自动分页导入数据
jdbc_paging_enabled => 'true'
jdbc_page_size => '100'
但是 ES 分页语句效率低下,于是自己写了个脚本分页
paging.sh
#!/usr/bin/env zsh
table_name="[your table]"
index_name="[your index]"
limit=10000
for i in {$1..$2}; do
[[ -e "./stop" ]] && echo "$0 $i $2" >> "./stop" && exit 0
offset=$(( $i * $limit ))
text="input {
jdbc {
jdbc_driver_library => '/path/to/mysql-connector-java-8.0.21.jar'
jdbc_driver_class => 'com.mysql.cj.jdbc.Driver'
jdbc_connection_string => 'jdbc:mysql://127.0.0.1:4000/LawDB?useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true&autoReconnect=true'
jdbc_user => 'root'
jdbc_password => '[passwd]'
jdbc_validate_connection => true
# jdbc_paging_enabled => 'true'
# jdbc_page_size => '10000'
jdbc_default_timezone => 'Asia/Shanghai'
statement => 'select id,section_head,section_party,section_litigation,section_truth,section_reason,section_result from $table_name where id > $offset order by id limit $limit'
}
}
filter {
mutate {
remove_field => ['@version', '@timestamp']
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => [ '<http://127.0.0.1:9200>', '<http://127.0.0.1:9201>', '<http://127.0.0.1:9202>' ]
index => '$index_name'
action => 'create'
document_id => '%{id}'
}
}
"
/path/to/logstash -w 10 --path.data "/tmp/logstash-$1-$2" -e "$text"
done
导入 100W 数据
paging.sh 0 100
想暂停 logstash,只需在脚本目录建立一个 stop 空文件
touch stop