Batch import
Batch imports are an efficient way to add multiple data objects and cross-references.
Additional information
To create a bulk import job, follow these steps:
- Initialize a batch object.
- Add items to the batch object.
- Ensure that the last batch is sent (flushed).
Basic import
The following example adds objects to the YourName
collection.
- Python Client v4
- Python Client v3
- JS/TS Client v3
- JS/TS Client v2
- Java
- Go
data_rows = [
{"title": f"Object {i+1}"} for i in range(5)
]
collection = client.collections.get("YourCollection")
with collection.batch.dynamic() as batch:
for data_row in data_rows:
batch.add_object(
properties=data_row,
)
class_name = "YourName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5)
]
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
for data_obj in data_objs:
batch.add_data_object(
data_obj,
class_name,
# tenant="tenantA" # If multi-tenancy is enabled, specify the tenant to which the object will be added.
)
const collection = 'myCollectionName'
let dataObject = []
for (let i = 1; i <= 10; i++) {
dataObject.push({ title: `Object ${i}`})
}
const myCollection = client.collections.get(collection)
const response = await myCollection.data.insertMany(dataObject);
console.log(response);
let className = 'YourName'; // Replace with your class name
let dataObjs = [];
for (let i = 1; i <= 5; i++)
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
let batcher5 = client.batch.objectsBatcher();
for (const dataObj of dataObjs)
batcher5 = batcher5.withObject({
class: className,
properties: dataObj,
// tenant: 'tenantA' // If multi-tenancy is enabled, specify the tenant to which the object will be added.
});
// Flush
await batcher5.do();
String className = "YourName"; // Replace with your class name
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i)); // Replace with your actual objects
dataObjs.add(properties);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (Map<String, Object> properties : dataObjs) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(properties)
// .tenant("tenantA") // If multi-tenancy is enabled, specify the tenant to which the object will be added.
.build()
);
}
// Flush
batcher.run();
className := "YourName" // Replace with your class name
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i), // Replace with your actual objects
})
}
batcher := client.Batch().ObjectsBatcher()
for _, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
// Tenant: "tenantA", // If multi-tenancy is enabled, specify the tenant to which the object will be added.
})
}
// Flush
batcher.Do(ctx)
Specify an ID value
Weaviate generates an UUID for each object. Object IDs must be unique. If you set object IDs, use one of these deterministic UUID methods to prevent duplicate IDs:
generate_uuid5
(Python)generateUuid5
(TypeScript)
- Python Client v4
- Python Client v3
- JS/TS Client v3
- JS/TS Client v2
- Java
- Go
from weaviate.util import generate_uuid5 # Generate a deterministic ID
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
collection = client.collections.get("YourCollection")
with collection.batch.dynamic() as batch:
for data_row in data_rows:
obj_uuid = generate_uuid5(data_row)
batch.add_object(
properties=data_row,
uuid=obj_uuid
)
from weaviate.util import generate_uuid5 # Generate a deterministic ID
class_name = "YourName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5) # Replace with your actual objects
]
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
for data_obj in data_objs:
batch.add_data_object(
data_obj,
class_name,
uuid=generate_uuid5(data_obj) # Optional: Specify an object ID
)
import { generateUuid5 } from 'weaviate-client'; // requires v1.3.2+
const questions = client.collections.get("JeopardyQuestion")
let dataObject = []
for (let i = 1; i <= 10; i++) {
dataObject.push({
id: generateUuid5(questions.name, `Object ${i}`),
properties: {
title: `Object ${i}`
}
})
}
await questions.data.insertMany(dataObject)
import { generateUuid5 } from 'weaviate-ts-client'; // requires v1.3.2+
className = 'YourName'; // Replace with your class name
dataObjs = [];
for (let i = 1; i <= 5; i++)
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
let batcherId = client.batch.objectsBatcher();
for (const dataObj of dataObjs)
batcherId = batcherId.withObject({
class: className,
properties: dataObj,
id: generateUuid5(dataObj.title),
});
// Flush
await batcherId.do();
String className = "YourName"; // Replace with your class name
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i)); // Replace with your actual objects
dataObjs.add(properties);
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (Map<String, Object> properties : dataObjs) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(properties)
.id(UUID.nameUUIDFromBytes(((String) properties.get("title")).getBytes()).toString())
.build()
);
}
// Flush
batcher.run();
generateUUID := func(input string) strfmt.UUID {
input = strings.ToLower(input)
hash := md5.Sum([]byte(input))
uuid := fmt.Sprintf("%x-%x-%x-%x-%x", hash[0:4], hash[4:6], hash[6:8], hash[8:10], hash[10:])
return strfmt.UUID(uuid)
}
className := "YourName" // Replace with your class name
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i), // Replace with your actual objects
})
}
batcher := client.Batch().ObjectsBatcher()
for _, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
ID: generateUUID((dataObj.(map[string]interface{}))["title"].(string)),
})
}
// Flush
batcher.Do(ctx)
Specify a vector
Use the vector
property to specify a vector for each object.
- Python Client v4
- Python Client v3
- JS/TS Client v3
- JS/TS Client v2
- Java
- Go
data_rows = [{"title": f"Object {i+1}"} for i in range(5)]
vectors = [[0.1] * 1536 for i in range(5)]
collection = client.collections.get("YourCollection")
with collection.batch.dynamic() as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector=vectors[i]
)
class_name = "YourName" # Replace with your class name
data_objs = [
{"title": f"Object {i+1}"} for i in range(5) # Replace with your actual objects
]
vectors = [
[0.25 + i/100] * 10 for i in range(5) # Replace with your actual vectors
]
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
for i, data_obj in enumerate(data_objs):
batch.add_data_object(
data_obj,
class_name,
vector=vectors[i] # Optional: Specify an object vector
)
const questions = client.collections.get("JeopardyQuestion")
let dataObject = []
for (let i = 1; i <= 10; i++) {
dataObject.push({
properties: {
title: `Object ${i}`
},
vectors: Array(100).fill(0.25136 + i / 100)
})
}
await questions.data.insertMany(dataObject)
className = 'YourName'; // Replace with your class name
dataObjs = [];
const vectors = [];
for (let i = 1; i <= 5; i++) {
dataObjs.push({ title: `Object ${i}` }); // Replace with your actual objects
vectors.push(Array(10).fill(0.25 + i / 100)); // Replace with your actual vectors
}
let batcherVectors = client.batch.objectsBatcher();
for (let i = 0; i < 5; i++)
batcherVectors = batcherVectors.withObject({
class: className,
properties: dataObjs[i],
vector: vectors[i],
});
// Flush
await batcherVectors.do();
String className = "YourName"; // Replace with your class name
List<Map<String, Object>> dataObjs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Map<String, Object> properties = new HashMap<>();
properties.put("title", String.format("Object %s", i)); // Replace with your actual objects
dataObjs.add(properties);
}
List<Float[]> vectors = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Float[] vector = new Float[10];
Arrays.fill(vector, 0.25f + i / 100f);
vectors.add(vector); // Replace with your actual vectors
}
ObjectsBatcher batcher = client.batch().objectsBatcher();
for (int i = 0; i < 5; i++) {
batcher.withObject(WeaviateObject.builder()
.className(className)
.properties(dataObjs.get(i))
.vector(vectors.get(i))
.build()
);
}
// Flush
batcher.run();
className := "YourName" // Replace with your class name
dataObjs := []models.PropertySchema{}
for i := 0; i < 5; i++ {
dataObjs = append(dataObjs, map[string]interface{}{
"title": fmt.Sprintf("Object %v", i), // Replace with your actual objects
})
}
vectors := [][]float32{}
for i := 0; i < 5; i++ {
vector := make([]float32, 10)
for j := range vector {
vector[j] = 0.25 + float32(j/100) // Replace with your actual vectors
}
vectors = append(vectors, vector)
}
batcher := client.Batch().ObjectsBatcher()
for i, dataObj := range dataObjs {
batcher.WithObjects(&models.Object{
Class: className,
Properties: dataObj,
Vector: vectors[i],
})
}
// Flush
batcher.Do(ctx)
Specify named vectors
v1.24
When you create an object, you can specify named vectors (if configured in your collection).
- Python Client v4
- Python Client v3
- JS/TS Client v3
- JS/TS Client v2
data_rows = [{
"title": f"Object {i+1}",
"body": f"Body {i+1}"
} for i in range(5)]
title_vectors = [[0.12] * 1536 for _ in range(5)]
body_vectors = [[0.34] * 1536 for _ in range(5)]
collection = client.collections.get("YourCollection")
with collection.batch.dynamic() as batch:
for i, data_row in enumerate(data_rows):
batch.add_object(
properties=data_row,
vector={
"title": title_vectors[i],
"body": body_vectors[i],
}
)
# Unfortunately, named vectors are not suppored in the v3 API / Python client.
# Please upgrade to the v4 API / Python client to use named vectors.
const questions = client.collections.get("JeopardyQuestion")
let dataObject = []
for (let i = 1; i <= 10; i++) {
dataObject.push({
properties: {
title: `Object ${i}`
},
vectors: {
title: Array(100).fill(0.25136 + i / 100),
body: Array(100).fill(0.89137 + i / 100)
}
})
}
await questions.data.insertMany(dataObject)
className = 'YourCollection'; // Replace with your class name
dataObjs = [];
const title_vectors = [];
const body_vectors = [];
for (let i = 1; i <= 5; i++) {
dataObjs.push({ title: `Object ${i}`, body: `Body ${i}` }); // Replace with your actual objects
title_vectors.push(Array(10).fill(0.25 + i / 100)); // Replace with your actual vectors
body_vectors.push(Array(10).fill(0.25 + i / 100)); // Replace with your actual vectors
}
let namedVectors = client.batch.objectsBatcher();
for (let i = 0; i < 5; i++)
namedVectors = namedVectors.withObject({
class: className,
properties: dataObjs[i],
vectors: {
title: title_vectors[i],
body: body_vectors[i]
},
});
// Flush
await namedVectors.do();
Import with references
You can batch create links from an object to another other object through cross-references.
- Python Client v4
- Python Client v3
- JS/TS Client v2
collection = client.collections.get("Author")
with collection.batch.fixed_size(batch_size=100) as batch:
batch.add_reference(
from_property="writesFor",
from_uuid=from_uuid,
to=target_uuid,
)
with client.batch as batch:
batch.add_reference(
from_object_uuid="36ddd591-2dee-4e7e-a3cc-eb86d30a4303",
from_object_class_name="Author",
from_property_name="wroteArticles",
to_object_uuid="6bb06a43-e7f0-393e-9ecf-3c0f4e129064",
to_object_class_name="Article",
# tenant="tenantA", # Optional; specify the tenant in multi-tenancy collections
)
const response = await client.batch
.referencesBatcher()
.withReference(
client.batch
.referencePayloadBuilder()
.withFromClassName('Author')
.withFromRefProp('wroteArticles')
.withFromId('36ddd591-2dee-4e7e-a3cc-eb86d30a4303')
.withToClassName('Article') // prior to v1.14 omit .withToClassName()
.withToId('6bb06a43-e7f0-393e-9ecf-3c0f4e129064')
.payload()
)
// You can add multiple references
// .withReference(
// client.batch
// .referencePayloadBuilder()
// .withFromClassName('Author')
// .withFromRefProp('wroteArticles')
// .withFromId('36ddd591-2dee-4e7e-a3cc-eb86d30a4303')
// .withToClassName('Article') // prior to v1.14 omit .withToClassName()
// .withToId('b72912b9-e5d7-304e-a654-66dc63c55b32')
// .payload()
// )
.withConsistencyLevel('ALL') // default QUORUM
// .withTenant('tenantA') // Optional; specify the tenant in multi-tenancy collections
.do();
console.log(JSON.stringify(response, null, 2));
Python-specific considerations
The Python clients have built-in batching methods to help you optimize import speed. For details, see the client documentation:
Stream data from large files
If your dataset is large, consider streaming the import to avoid out-of-memory issues.
- Python Client v4 - JSON
- Python Client v4 - CSV
- Python Client v3 - JSON
- Python Client v3 - CSV
- TypeScript - JSON
- TypeScript - CSV
import ijson
# Settings for displaying the import progress
counter = 0
interval = 100 # print progress every this many records; should be bigger than the batch_size
print("JSON streaming, to avoid running out of memory on large files...")
with client.batch.fixed_size(batch_size=200) as batch:
with open("jeopardy_1k.json", "rb") as f:
objects = ijson.items(f, "item")
for obj in objects:
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print(f"Finished importing {counter} articles.")
import pandas as pd
# Settings for displaying the import progress
counter = 0
interval = 100 # print progress every this many records; should be bigger than the batch_size
def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
with client.batch.fixed_size(batch_size=200) as batch:
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print("pandas dataframe iterator with lazy-loading, to not load all records in RAM at once...")
with client.batch.fixed_size(batch_size=200) as batch:
with pd.read_csv(
"jeopardy_1k.csv",
usecols=["Question", "Answer", "Category"],
chunksize=100, # number of rows per chunk
) as csv_iterator:
# Iterate through the dataframe chunks and add each CSV record to the batch
for chunk in csv_iterator:
for index, row in chunk.iterrows():
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
batch.add_object(
collection="JeopardyQuestion",
properties=properties,
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector["default"]
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print(f"Finished importing {counter} articles.")
import weaviate
import ijson
# Settings for displaying the import progress
counter = 0
interval = 20 # print progress every this many records; should be bigger than the batch_size
def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
# Add the object to the batch
batch.add_data_object(
data_object=properties,
class_name="JeopardyQuestion",
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print("JSON streaming, to avoid running out of memory on large files...")
with open("jeopardy_1k.json", "rb") as f:
objects = ijson.items(f, "item")
for o in objects:
add_object(o)
print(f"Finished importing {counter} articles.")
import weaviate
import pandas as pd
# Settings for displaying the import progress
counter = 0
interval = 20 # print progress every this many records; should be bigger than the batch_size
def add_object(obj) -> None:
global counter
properties = {
"question": obj["Question"],
"answer": obj["Answer"],
}
client.batch.configure(batch_size=100) # Configure batch
with client.batch as batch:
# Add the object to the batch
batch.add_data_object(
data_object=properties,
class_name="JeopardyQuestion",
# If you Bring Your Own Vectors, add the `vector` parameter here
# vector=obj.vector
)
# Calculate and display progress
counter += 1
if counter % interval == 0:
print(f"Imported {counter} articles...")
print("pandas dataframe iterator with lazy-loading, to not load all records in RAM at once...")
with pd.read_csv(
"jeopardy_1k.csv",
usecols=["Question", "Answer", "Category"],
chunksize=100, # number of rows per chunk
) as csv_iterator:
# Iterate through the dataframe chunks and add each CSV record to the batch
for chunk in csv_iterator:
for index, row in chunk.iterrows():
add_object(row)
print(f"Finished importing {counter} articles.")
import weaviate from 'weaviate-client';
import fs from 'fs';
import parser from 'stream-json';
import StreamArray from 'stream-json/streamers/StreamArray';
import Chain from 'stream-chain';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
// Add the object to the batch queue
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
// If you Bring Your Own Vectors, add the `vector` parameter here
// vector: JSON.parse(obj['Vector']),
});
counter++;
// When the batch counter reaches batchSize, push the objects to Weaviate
if (counter % batchSize === 0) {
// Flush the batch queue and restart it
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
// Handle errors
for (const r of response)
if (r.result.errors)
throw r.result.errors;
console.log(`Imported ${counter} articles...`);
}
}
async function importJson(filePath) {
const pipeline = new Chain([
fs.createReadStream(filePath),
parser(),
new StreamArray(),
]);
for await (const { value } of pipeline) {
await addObject(value);
}
}
await importJson('jeopardy_1k.json');
// Flush any remaining objects
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
import weaviate from 'weaviate-client';
import fs from 'fs';
import csv from 'csv-parser';
let batcher = client.batch.objectsBatcher();
let counter = 0;
const batchSize = 20;
async function addObject(obj: object): Promise<void> {
const properties = {
question: obj['Question'],
answer: obj['Answer'],
};
// Add the object to the batch queue
batcher = batcher.withObject({
class: 'JeopardyQuestion',
properties,
// If you Bring Your Own Vectors, add the `vector` parameter here
// vector: JSON.parse(obj['Vector']),
});
counter++;
// When the batch counter reaches batchSize, push the objects to Weaviate
if (counter % batchSize === 0) {
// Flush the batch queue and restart it
const response = await batcher.do();
batcher = client.batch.objectsBatcher();
// Handle errors
for (const r of response)
if (r.result.errors)
throw r.result.errors;
console.log(`Imported ${counter} articles...`);
}
}
async function importCSV(filePath) {
const stream = fs.createReadStream(filePath).pipe(csv());
for await (const row of stream) {
await addObject(row);
}
}
await importCSV('jeopardy_1k.csv');
// Flush any remaining objects
if (batcher.payload().objects.length > 0)
await batcher.do();
console.log(`Finished importing ${counter} articles.`);
Batch vectorization
v1.25
.Some model providers provide batch vectorization APIs, where each request can include multiple objects.
From Weaviate v1.25.0
, a batch import automatically makes use of the model providers' batch vectorization APIs where available. This reduces the number of requests to the model provider, improving throughput.
Model provider configurations
You can configure the batch vectorization settings for each model provider, such as the requests per minute or tokens per minute. The following examples sets rate limits for Cohere and OpenAI integrations, and provides API keys for both.
Note that each provider exposes different configuration options.
- Python (v4)
from weaviate.classes.config import Integrations
integrations = [
# Each model provider may expose different parameters
Integrations.cohere(
api_key=cohere_key,
requests_per_minute_embeddings=rpm_embeddings,
),
Integrations.openai(
api_key=openai_key,
requests_per_minute_embeddings=rpm_embeddings,
tokens_per_minute_embeddings=tpm_embeddings, # e.g. OpenAI also exposes tokens per minute for embeddings
),
]
client.integrations.configure(integrations)
Additional considerations
Data imports can be resource intensive. Consider the following when you import large amounts of data.
Asynchronous imports
Available starting in v1.22
. This is an experimental feature. Please use with caution.
To maximize import speed, enable asynchronous indexing.
To enable asynchronous indexing, set the ASYNC_INDEXING
environment variable to true
in your Weaviate configuration file.
weaviate:
image: cr.weaviate.io/semitechnologies/weaviate:1.25.0
...
environment:
ASYNC_INDEXING: 'true'
...
gRPC connection
v1.23
.The gRPC API is faster than the REST API. Use the gRPC API to improve import speeds.
The Python client v4
and the TypeScript client v3 use gRPC.
Related pages
Questions and feedback
If you have any questions or feedback, let us know in the user forum.